From a562bcc623dee9817f5d07dd26ced443d0c0d68b Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 25 Apr 2018 10:36:28 +0200 Subject: [PATCH 1/4] Watcher: Make start/stop cycle more predictable The current implementation starts/stops watcher using an executor. This can result in our of order operations. This commit reduces those executor calls to an absolute minimum in order to be able to do state changes within the cluster state listener method, which runs in sequence. When a state change occurs that forces the watcher service to pause (like no watcher index, no master node, no local shards), the service is now in a paused state. Pausing is a super lightweight operation, which marks the ExecutionService as paused and waits for the currently executing watches to finish in the background via an executor. The same applies for stopping, the potentially long running operation is outsourced in to an executor, as waiting for executed watches is decoupled from the current state. The only other long running operation is starting, where watches need to be loaded. This is also done via an executor, but has an additional protection by checking the cluster state version it was started with. If another cluster state version was trying to load the watches, then this loading will not take effect. This PR also cleans up some unused states, like the a simple boolean in the HistoryStore/TriggeredWatchStore marking it as started or stopped, as this can now be caught in the execution service. Another advantage of this approach is the fact, that now only triggered watches are not getting executed, while watches that are run via the Execute Watch API will still be executed regardless if watcher is stopped or not. Lastly the TickerScheduleTriggerEngine thread now only starts on data nodes. --- .../xpack/test/rest/XPackRestIT.java | 31 +- .../elasticsearch/xpack/watcher/Watcher.java | 4 +- .../watcher/WatcherIndexingListener.java | 4 +- .../watcher/WatcherLifeCycleService.java | 237 ++---- .../xpack/watcher/WatcherService.java | 263 ++++--- .../watcher/execution/ExecutionService.java | 136 ++-- .../execution/TriggeredWatchStore.java | 62 +- .../xpack/watcher/history/HistoryStore.java | 46 +- .../stats/TransportWatcherStatsAction.java | 11 +- .../engine/TickerScheduleTriggerEngine.java | 55 +- .../watcher/WatcherLifeCycleServiceTests.java | 676 +++++++----------- .../xpack/watcher/WatcherServiceTests.java | 23 +- .../execution/ExecutionServiceTests.java | 5 +- .../execution/TriggeredWatchStoreTests.java | 82 +-- .../watcher/history/HistoryStoreTests.java | 28 +- .../test/integration/BootStrapTests.java | 12 +- .../test/integration/WatchAckTests.java | 4 - .../TransportWatcherStatsActionTests.java | 16 +- .../SmokeTestWatcherTestSuiteIT.java | 86 ++- 19 files changed, 763 insertions(+), 1018 deletions(-) diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java index dcca2677f2ce2..b5f2003bbd2de 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java @@ -97,17 +97,26 @@ private void waitForWatcher() throws Exception { // ensure watcher is started, so that a test can stop watcher and everything still works fine if (isWatcherTest()) { assertBusy(() -> { - try { - ClientYamlTestResponse response = - getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap()); - String state = (String) response.evaluate("stats.0.watcher_state"); - if ("started".equals(state) == false) { - getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap()); - } - // assertion required to exit the assertBusy lambda - assertThat(state, is("started")); - } catch (IOException e) { - throw new AssertionError(e); + ClientYamlTestResponse response = + getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap()); + String state = (String) response.evaluate("stats.0.watcher_state"); + + switch (state) { + case "stopped": + ClientYamlTestResponse startResponse = + getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap()); + boolean isAcknowledged = (boolean) startResponse.evaluate("acknowledged"); + assertThat(isAcknowledged, is(true)); + break; + case "stopping": + throw new AssertionError("waiting until stopping state reached stopped state to start again"); + case "starting": + throw new AssertionError("waiting until starting state reached started state"); + case "started": + // all good here, we are done + break; + default: + throw new AssertionError("unknown state[" + state + "]"); } }); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 57fcff7671518..6c4ac1994ffce 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -351,7 +351,7 @@ public Collection createComponents(Client client, ClusterService cluster final WatchParser watchParser = new WatchParser(settings, triggerService, registry, inputRegistry, cryptoService, getClock()); final ExecutionService executionService = new ExecutionService(settings, historyStore, triggeredWatchStore, watchExecutor, - getClock(), watchParser, clusterService, client); + getClock(), watchParser, clusterService, client, threadPool.generic()); final Consumer> triggerEngineListener = getTriggerEngineListener(executionService); triggerService.register(triggerEngineListener); @@ -360,7 +360,7 @@ public Collection createComponents(Client client, ClusterService cluster watchParser, client); final WatcherLifeCycleService watcherLifeCycleService = - new WatcherLifeCycleService(settings, threadPool, clusterService, watcherService); + new WatcherLifeCycleService(settings, clusterService, watcherService); listener = new WatcherIndexingListener(settings, watchParser, getClock(), triggerService); clusterService.addListener(listener); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java index 37836ca94f8c0..8e0fbcb7cb4fc 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java @@ -123,10 +123,10 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id()); if (shouldBeTriggered) { if (watch.status().state().isActive()) { - logger.debug("adding watch [{}] to trigger", watch.id()); + logger.debug("adding watch [{}] to trigger service", watch.id()); triggerService.add(watch); } else { - logger.debug("removing watch [{}] to trigger", watch.id()); + logger.debug("removing watch [{}] to trigger service", watch.id()); triggerService.remove(watch.id()); } } else { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index bec496068e3a7..eef9e019b7a7e 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -21,29 +21,19 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.upgrade.UpgradeField; import org.elasticsearch.xpack.core.watcher.WatcherMetaData; import org.elasticsearch.xpack.core.watcher.WatcherState; -import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.watch.Watch; -import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry; import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; import java.util.Collections; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener { @@ -54,30 +44,14 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste public static final Setting SETTING_REQUIRE_MANUAL_START = Setting.boolSetting("xpack.watcher.require_manual_start", false, Property.NodeScope); - private static final String LIFECYCLE_THREADPOOL_NAME = "watcher-lifecycle"; - - private final WatcherService watcherService; - private final ExecutorService executor; - private AtomicReference> previousAllocationIds = new AtomicReference<>(Collections.emptyList()); - private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this. + private final AtomicReference state = new AtomicReference<>(WatcherState.STARTED); + private final AtomicReference> previousAllocationIds = new AtomicReference<>(Collections.emptyList()); private final boolean requireManualStart; + private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this. + private volatile WatcherService watcherService; - WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService, - WatcherService watcherService) { - // use a single thread executor so that lifecycle changes are handled in the order they - // are submitted in - this(settings, clusterService, watcherService, EsExecutors.newFixed( - LIFECYCLE_THREADPOOL_NAME, - 1, - 1000, - daemonThreadFactory(settings, LIFECYCLE_THREADPOOL_NAME), - threadPool.getThreadContext())); - } - - WatcherLifeCycleService(Settings settings, ClusterService clusterService, - WatcherService watcherService, ExecutorService executorService) { + WatcherLifeCycleService(Settings settings, ClusterService clusterService, WatcherService watcherService) { super(settings); - this.executor = executorService; this.watcherService = watcherService; this.requireManualStart = SETTING_REQUIRE_MANUAL_START.get(settings); clusterService.addListener(this); @@ -91,58 +65,12 @@ public void beforeStop() { }); } - public synchronized void stop(String reason) { - watcherService.stop(reason); - } - synchronized void shutDown() { + this.state.set(WatcherState.STOPPING); shutDown = true; - stop("shutdown initiated"); - stopExecutor(); - } - - void stopExecutor() { - ThreadPool.terminate(executor, 10L, TimeUnit.SECONDS); - } - - private synchronized void start(ClusterState state) { - if (shutDown) { - return; - } - final WatcherState watcherState = watcherService.state(); - if (watcherState != WatcherState.STOPPED) { - logger.debug("not starting watcher. watcher can only start if its current state is [{}], but its current state now is [{}]", - WatcherState.STOPPED, watcherState); - return; - } - - // If we start from a cluster state update we need to check if previously we stopped manually - // otherwise Watcher would start upon the next cluster state update while the user instructed Watcher to not run - WatcherMetaData watcherMetaData = state.getMetaData().custom(WatcherMetaData.TYPE); - if (watcherMetaData != null && watcherMetaData.manuallyStopped()) { - logger.debug("not starting watcher. watcher was stopped manually and therefore cannot be auto-started"); - return; - } - - // ensure that templates are existing before starting watcher - // the watcher index template registry is independent from watcher being started or stopped - if (WatcherIndexTemplateRegistry.validate(state) == false) { - logger.debug("not starting watcher, watcher templates are missing in the cluster state"); - return; - } - - if (watcherService.validate(state)) { - logger.trace("starting... (based on cluster state version [{}])", state.getVersion()); - try { - // we need to populate the allocation ids before the next cluster state listener comes in - checkAndSetAllocationIds(state, false); - watcherService.start(state); - } catch (Exception e) { - logger.warn("failed to start watcher. please wait for the cluster to become ready or try to start Watcher manually", e); - } - } else { - logger.debug("not starting watcher. because the cluster isn't ready yet to run watcher"); - } + clearAllocationIds(); + watcherService.shutDown(); + this.state.set(WatcherState.STOPPED); } /** @@ -169,77 +97,43 @@ public void clusterChanged(ClusterChangedEvent event) { } if (Strings.isNullOrEmpty(event.state().nodes().getMasterNodeId())) { - clearAllocationIds(); - executor.execute(() -> this.stop("no master node")); + pauseExecution("no master node"); return; } if (event.state().getBlocks().hasGlobalBlock(ClusterBlockLevel.WRITE)) { - clearAllocationIds(); - executor.execute(() -> this.stop("write level cluster block")); + pauseExecution("write level cluster block"); return; } - if (isWatcherStoppedManually(event.state())) { - clearAllocationIds(); - executor.execute(() -> this.stop("watcher manually marked to shutdown by cluster state update")); - } else { - final WatcherState watcherState = watcherService.state(); - if (watcherState == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) { - checkAndSetAllocationIds(event.state(), true); - } else if (watcherState != WatcherState.STARTED && watcherState != WatcherState.STARTING) { - IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData()); - IndexMetaData triggeredWatchesIndexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, - event.state().metaData()); - boolean isIndexInternalFormatWatchIndex = watcherIndexMetaData == null || - UpgradeField.checkInternalIndexFormat(watcherIndexMetaData); - boolean isIndexInternalFormatTriggeredWatchIndex = triggeredWatchesIndexMetaData == null || - UpgradeField.checkInternalIndexFormat(triggeredWatchesIndexMetaData); - if (isIndexInternalFormatTriggeredWatchIndex && isIndexInternalFormatWatchIndex) { - checkAndSetAllocationIds(event.state(), false); - executor.execute(() -> start(event.state())); - } else { - logger.warn("not starting watcher, upgrade API run required: .watches[{}], .triggered_watches[{}]", - isIndexInternalFormatWatchIndex, isIndexInternalFormatTriggeredWatchIndex); - } - } + boolean isWatcherStoppedManually = isWatcherStoppedManually(event.state()); + // if this is not a data node, we need to start it ourselves possibly + if (event.state().nodes().getLocalNode().isDataNode() == false && + isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) { + watcherService.start(event.state()); + this.state.set(WatcherState.STARTED); + return; } - } - - /** - * check if watcher has been stopped manually via the stop API - */ - private boolean isWatcherStoppedManually(ClusterState state) { - WatcherMetaData watcherMetaData = state.getMetaData().custom(WatcherMetaData.TYPE); - return watcherMetaData != null && watcherMetaData.manuallyStopped(); - } - /** - * check and optionally set the current allocation ids - * - * @param state the current cluster state - * @param callWatcherService should the watcher service be called for starting/stopping/reloading or should this be treated as a - * dryrun so that the caller is responsible for this - */ - private void checkAndSetAllocationIds(ClusterState state, boolean callWatcherService) { - IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData()); - if (watcherIndexMetaData == null) { - if (clearAllocationIds() && callWatcherService) { - executor.execute(wrapWatcherService(() -> watcherService.pauseExecution("no watcher index found"), - e -> logger.error("error pausing watch execution", e))); + if (isWatcherStoppedManually) { + if (this.state.get() == WatcherState.STARTED) { + clearAllocationIds(); + watcherService.stop("watcher manually marked to shutdown by cluster state update"); + this.state.set(WatcherState.STOPPED); } return; } - DiscoveryNode localNode = state.nodes().getLocalNode(); - RoutingNode routingNode = state.getRoutingNodes().node(localNode.getId()); - // this can happen if the node does not hold any data + DiscoveryNode localNode = event.state().nodes().getLocalNode(); + RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId()); if (routingNode == null) { - if (clearAllocationIds() && callWatcherService) { - executor.execute(wrapWatcherService( - () -> watcherService.pauseExecution("no routing node for local node found, network issue?"), - e -> logger.error("error pausing watch execution", e))); - } + pauseExecution("routing node in cluster state undefined. network issue?"); + return; + } + + IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData()); + if (watcherIndexMetaData == null) { + pauseExecution("no watcher index found"); return; } @@ -247,28 +141,48 @@ private void checkAndSetAllocationIds(ClusterState state, boolean callWatcherSer List localShards = routingNode.shardsWithState(watchIndex, RELOCATING, STARTED); // no local shards, empty out watcher and dont waste resources! if (localShards.isEmpty()) { - if (clearAllocationIds() && callWatcherService) { - executor.execute(wrapWatcherService(() -> watcherService.pauseExecution("no local watcher shards found"), - e -> logger.error("error pausing watch execution", e))); - } + pauseExecution("no local watcher shards found"); return; } List currentAllocationIds = localShards.stream() - .map(ShardRouting::allocationId) - .map(AllocationId::getId) - .collect(Collectors.toList()); - Collections.sort(currentAllocationIds); + .map(ShardRouting::allocationId) + .map(AllocationId::getId) + .sorted() + .collect(Collectors.toList()); if (previousAllocationIds.get().equals(currentAllocationIds) == false) { - previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds)); - if (callWatcherService) { - executor.execute(wrapWatcherService(() -> watcherService.reload(state, "new local watcher shard allocation ids"), - e -> logger.error("error reloading watcher", e))); + if (watcherService.validate(event.state())) { + previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds)); + if (state.get() == WatcherState.STARTED) { + watcherService.reload(event.state(), "new local watcher shard allocation ids"); + } else if (state.get() == WatcherState.STOPPED) { + watcherService.start(event.state()); + this.state.set(WatcherState.STARTED); + } + } else { + clearAllocationIds(); + this.state.set(WatcherState.STOPPED); } } } + private void pauseExecution(String reason) { + if (clearAllocationIds()) { + watcherService.pauseExecution(reason); + } + this.state.set(WatcherState.STARTED); + } + + /** + * check if watcher has been stopped manually via the stop API + */ + private boolean isWatcherStoppedManually(ClusterState state) { + WatcherMetaData watcherMetaData = state.getMetaData().custom(WatcherMetaData.TYPE); + return watcherMetaData != null && watcherMetaData.manuallyStopped(); + } + + /** /** * clear out current allocation ids if not already happened * @return true, if existing allocation ids were cleaned out, false otherwise @@ -283,26 +197,7 @@ List allocationIds() { return previousAllocationIds.get(); } - /** - * Wraps an abstract runnable to easier supply onFailure and doRun methods via lambdas - * This ensures that the uncaught exception handler in the executing threadpool does not get called - * - * @param run The code to be executed in the runnable - * @param exceptionConsumer The exception handling code to be executed, if the runnable fails - * @return The AbstractRunnable instance to pass to the executor - */ - private static AbstractRunnable wrapWatcherService(Runnable run, Consumer exceptionConsumer) { - - return new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - exceptionConsumer.accept(e); - } - - @Override - protected void doRun() throws Exception { - run.run(); - } - }; + public WatcherState getState() { + return state.get(); } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 56c56baae8944..d280a150e8d7b 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -7,7 +7,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -26,16 +25,22 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilders; -import org.elasticsearch.xpack.core.watcher.WatcherState; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.upgrade.UpgradeField; +import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.execution.TriggeredWatch; import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore; +import org.elasticsearch.xpack.watcher.history.HistoryStore; +import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; @@ -48,19 +53,24 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; import static org.elasticsearch.xpack.core.watcher.support.Exceptions.illegalState; import static org.elasticsearch.xpack.core.watcher.watch.Watch.INDEX; - public class WatcherService extends AbstractComponent { + private static final String LIFECYCLE_THREADPOOL_NAME = "watcher-lifecycle"; + private final TriggerService triggerService; private final TriggeredWatchStore triggeredWatchStore; private final ExecutionService executionService; @@ -68,12 +78,12 @@ public class WatcherService extends AbstractComponent { private final int scrollSize; private final WatchParser parser; private final Client client; - // package-private for testing - final AtomicReference state = new AtomicReference<>(WatcherState.STOPPED); private final TimeValue defaultSearchTimeout; + private final AtomicLong processedClusterStateVersion = new AtomicLong(0); + private final ExecutorService executor; - public WatcherService(Settings settings, TriggerService triggerService, TriggeredWatchStore triggeredWatchStore, - ExecutionService executionService, WatchParser parser, Client client) { + WatcherService(Settings settings, TriggerService triggerService, TriggeredWatchStore triggeredWatchStore, + ExecutionService executionService, WatchParser parser, Client client, ExecutorService executor) { super(settings); this.triggerService = triggerService; this.triggeredWatchStore = triggeredWatchStore; @@ -83,108 +93,137 @@ public WatcherService(Settings settings, TriggerService triggerService, Triggere this.defaultSearchTimeout = settings.getAsTime("xpack.watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30)); this.parser = parser; this.client = client; + this.executor = executor; + } + + WatcherService(Settings settings, TriggerService triggerService, TriggeredWatchStore triggeredWatchStore, + ExecutionService executionService, WatchParser parser, Client client) { + this(settings, triggerService, triggeredWatchStore, executionService, parser, client, + EsExecutors.newFixed(LIFECYCLE_THREADPOOL_NAME, 1, 1000, daemonThreadFactory(settings, LIFECYCLE_THREADPOOL_NAME), + client.threadPool().getThreadContext())); } /** - * Ensure that watcher can be started, by checking if all indices are marked as up and ready in the cluster state + * Ensure that watcher can be reloaded, by checking if all indices are marked as up and ready in the cluster state * @param state The current cluster state * @return true if everything is good to go, so that the service can be started */ public boolean validate(ClusterState state) { - boolean executionServiceValid = executionService.validate(state); - if (executionServiceValid) { - try { - IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData()); - // no watch index yet means we are good to go - if (indexMetaData == null) { - return true; - } else { - if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { - logger.debug("watch index [{}] is marked as closed, watcher cannot be started", indexMetaData.getIndex().getName()); - return false; - } else { - return state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive(); - } - } - } catch (IllegalStateException e) { - logger.trace((Supplier) () -> new ParameterizedMessage("error getting index meta data [{}]: ", Watch.INDEX), e); - return false; - } + // template check makes only sense for non existing indices, we could refine this + boolean hasValidWatcherTemplates = WatcherIndexTemplateRegistry.validate(state); + if (hasValidWatcherTemplates == false) { + logger.debug("missing watcher index templates, not starting watcher service"); + return false; } - return false; - } - - public void start(ClusterState clusterState) throws Exception { - // starting already triggered, exit early - WatcherState currentState = state.get(); - if (currentState == WatcherState.STARTING || currentState == WatcherState.STARTED) { - throw new IllegalStateException("watcher is already in state ["+ currentState +"]"); + IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData()); + IndexMetaData triggeredWatchesIndexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, + state.metaData()); + boolean isIndexInternalFormatWatchIndex = watcherIndexMetaData == null || + UpgradeField.checkInternalIndexFormat(watcherIndexMetaData); + boolean isIndexInternalFormatTriggeredWatchIndex = triggeredWatchesIndexMetaData == null || + UpgradeField.checkInternalIndexFormat(triggeredWatchesIndexMetaData); + if (isIndexInternalFormatTriggeredWatchIndex == false || isIndexInternalFormatWatchIndex == false) { + logger.warn("not starting watcher, upgrade API run required: .watches[{}], .triggered_watches[{}]", + isIndexInternalFormatWatchIndex, isIndexInternalFormatTriggeredWatchIndex); + return false; } - if (state.compareAndSet(WatcherState.STOPPED, WatcherState.STARTING)) { - try { - logger.debug("starting watch service..."); - - executionService.start(); - Collection watches = loadWatches(clusterState); - triggerService.start(watches); - - Collection triggeredWatches = triggeredWatchStore.findTriggeredWatches(watches, clusterState); - executionService.executeTriggeredWatches(triggeredWatches); - - state.set(WatcherState.STARTED); - logger.debug("watch service has started"); - } catch (Exception e) { - state.set(WatcherState.STOPPED); - throw e; + try { + boolean storesValid = TriggeredWatchStore.validate(state) && HistoryStore.validate(state); + if (storesValid == false) { + return false; } - } else { - logger.debug("could not transition state from stopped to starting, current state [{}]", state.get()); + + return watcherIndexMetaData == null || (watcherIndexMetaData.getState() == IndexMetaData.State.OPEN && + state.routingTable().index(watcherIndexMetaData.getIndex()).allPrimaryShardsActive()); + } catch (IllegalStateException e) { + logger.debug("error validating to start watcher", e); + return false; } } /** - * Stops the watcher service and it's subservices. Should only be called, when watcher is stopped manually + * Stops the watcher service and marks its services as paused */ public void stop(String reason) { - WatcherState currentState = state.get(); - if (currentState == WatcherState.STOPPING || currentState == WatcherState.STOPPED) { - logger.trace("watcher is already in state [{}] not stopping", currentState); - } else { - try { - if (state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING)) { - logger.info("stopping watch service, reason [{}]", reason); - triggerService.stop(); - executionService.stop(); - state.set(WatcherState.STOPPED); - logger.debug("watch service has stopped"); - } else { - logger.debug("could not transition state from started to stopping, current state [{}]", state.get()); - } - } catch (Exception e) { - state.set(WatcherState.STOPPED); - logger.error("Error stopping watcher", e); - } - } + logger.info("stopping watch service, reason [{}]", reason); + executionService.pause(); + triggerService.pauseExecution(); + } + + /** + * shuts down the trigger service as well to make sure there are no lingering threads + * also no need to check anything, as this is final, we just can go to status STOPPED + */ + void shutDown() { + logger.info("stopping watch service, reason [shutdown initiated]"); + executionService.pause(); + triggerService.stop(); + stopExecutor(); + logger.debug("watch service has stopped"); } + void stopExecutor() { + ThreadPool.terminate(executor, 10L, TimeUnit.SECONDS); + } /** * Reload the watcher service, does not switch the state from stopped to started, just keep going - * @param clusterState cluster state, which is needed to find out about local shards + * @param state cluster state, which is needed to find out about local shards */ - public void reload(ClusterState clusterState, String reason) { + void reload(ClusterState state, String reason) { + // this method contains the only async code block, being called by the cluster state listener + // the reason for this is, that loading he watches is done in a sync manner and thus cannot be done on the cluster state listener + // thread + // + // this method itself is called by the cluster state listener, so will never be called in parallel + // setting the cluster state version allows us to know if the async method has been overtaken by another async method + // this is unlikely, but can happen, if the thread pool schedules two of those runnables at the same time + // by checking the cluster state version before and after loading the watches we can potentially just exit without applying the + // changes + processedClusterStateVersion.set(state.getVersion()); pauseExecution(reason); + triggerService.pauseExecution(); + + executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false), + e -> logger.error("error reloading watcher", e))); + } + + public void start(ClusterState state) { + processedClusterStateVersion.set(state.getVersion()); + executor.execute(wrapWatcherService(() -> reloadInner(state, "starting", true), + e -> logger.error("error starting watcher", e))); + } + + /** + * reload the watches and start scheduling them + */ + private synchronized void reloadInner(ClusterState state, String reason, boolean loadTriggeredWatches) { + // exit early if another thread has come in between + if (processedClusterStateVersion.get() != state.getVersion()) { + logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress", + state.getVersion(), processedClusterStateVersion.get()); + } - // load watches - Collection watches = loadWatches(clusterState); - watches.forEach(triggerService::add); + Collection watches = loadWatches(state); + Collection triggeredWatches = Collections.emptyList(); + if (loadTriggeredWatches) { + triggeredWatches = triggeredWatchStore.findTriggeredWatches(watches, state); + } - // then load triggered watches, which might have been in the queue that we just cleared, - // maybe we dont need to execute those anymore however, i.e. due to shard shuffling - // then someone else will - Collection triggeredWatches = triggeredWatchStore.findTriggeredWatches(watches, clusterState); - executionService.executeTriggeredWatches(triggeredWatches); + // if we had another state coming in the meantime, we will not start the trigger engines with these watches, but wait + // until the others are loaded + if (processedClusterStateVersion.get() == state.getVersion()) { + executionService.unPause(); + triggerService.start(watches); + if (triggeredWatches.isEmpty() == false) { + executionService.executeTriggeredWatches(triggeredWatches); + } + logger.debug("watch service has been reloaded, reason [{}]", reason); + } else { + logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress", + state.getVersion(), processedClusterStateVersion.get()); + } } /** @@ -192,8 +231,7 @@ public void reload(ClusterState clusterState, String reason) { * manual watch execution, i.e. via the execute watch API */ public void pauseExecution(String reason) { - int cancelledTaskCount = executionService.pauseExecution(); - triggerService.pauseExecution(); + int cancelledTaskCount = executionService.pause(); logger.info("paused watch execution, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); } @@ -212,7 +250,7 @@ private Collection loadWatches(ClusterState clusterState) { List watches = new ArrayList<>(); try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(INDEX)) - .actionGet(TimeValue.timeValueSeconds(5)); + .actionGet(TimeValue.timeValueSeconds(5)); if (refreshResponse.getSuccessfulShards() < indexMetaData.getNumberOfShards()) { throw illegalState("not all required shards have been refreshed"); } @@ -230,12 +268,12 @@ private Collection loadWatches(ClusterState clusterState) { List watchIndexShardRoutings = clusterState.getRoutingTable().allShards(watchIndexName); SearchRequest searchRequest = new SearchRequest(INDEX) - .scroll(scrollTimeout) - .preference(Preference.ONLY_LOCAL.toString()) - .source(new SearchSourceBuilder() - .size(scrollSize) - .sort(SortBuilders.fieldSort("_doc")) - .version(true)); + .scroll(scrollTimeout) + .preference(Preference.ONLY_LOCAL.toString()) + .source(new SearchSourceBuilder() + .size(scrollSize) + .sort(SortBuilders.fieldSort("_doc")) + .version(true)); response = client.search(searchRequest).actionGet(defaultSearchTimeout); if (response.getTotalShards() != response.getSuccessfulShards()) { @@ -249,11 +287,11 @@ private Collection loadWatches(ClusterState clusterState) { Map> sortedShards = new HashMap<>(localShards.size()); for (ShardRouting localShardRouting : localShards) { List sortedAllocationIds = watchIndexShardRoutings.stream() - .filter(sr -> localShardRouting.getId() == sr.getId()) - .map(ShardRouting::allocationId).filter(Objects::nonNull) - .map(AllocationId::getId).filter(Objects::nonNull) - .sorted() - .collect(Collectors.toList()); + .filter(sr -> localShardRouting.getId() == sr.getId()) + .map(ShardRouting::allocationId).filter(Objects::nonNull) + .map(AllocationId::getId).filter(Objects::nonNull) + .sorted() + .collect(Collectors.toList()); sortedShards.put(localShardRouting.getId(), sortedAllocationIds); } @@ -262,8 +300,8 @@ private Collection loadWatches(ClusterState clusterState) { for (SearchHit hit : response.getHits()) { // find out if this hit should be processed locally Optional correspondingShardOptional = localShards.stream() - .filter(sr -> sr.shardId().equals(hit.getShard().getShardId())) - .findFirst(); + .filter(sr -> sr.shardId().equals(hit.getShard().getShardId())) + .findFirst(); if (correspondingShardOptional.isPresent() == false) { continue; } @@ -284,7 +322,8 @@ private Collection loadWatches(ClusterState clusterState) { watches.add(watch); } } catch (Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("couldn't load watch [{}], ignoring it...", id), e); + logger.error((org.apache.logging.log4j.util.Supplier) + () -> new ParameterizedMessage("couldn't load watch [{}], ignoring it...", id), e); } } SearchScrollRequest request = new SearchScrollRequest(response.getScrollId()); @@ -320,7 +359,25 @@ private boolean parseWatchOnThisNode(String id, int totalShardCount, int index) return shardIndex == index; } - public WatcherState state() { - return state.get(); + /** + * Wraps an abstract runnable to easier supply onFailure and doRun methods via lambdas + * This ensures that the uncaught exception handler in the executing threadpool does not get called + * + * @param run The code to be executed in the runnable + * @param exceptionConsumer The exception handling code to be executed, if the runnable fails + * @return The AbstractRunnable instance to pass to the executor + */ + private static AbstractRunnable wrapWatcherService(Runnable run, Consumer exceptionConsumer) { + return new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + exceptionConsumer.accept(e); + } + + @Override + protected void doRun() throws Exception { + run.run(); + } + }; } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 29ef6e03f6d4d..ae6854833b827 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.watcher.execution; +import com.google.common.collect.Iterables; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; @@ -17,7 +18,6 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.MapBuilder; @@ -64,8 +64,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; @@ -75,27 +77,30 @@ public class ExecutionService extends AbstractComponent { public static final Setting DEFAULT_THROTTLE_PERIOD_SETTING = Setting.positiveTimeSetting("xpack.watcher.execution.default_throttle_period", - TimeValue.timeValueSeconds(5), Setting.Property.NodeScope); + TimeValue.timeValueSeconds(5), Setting.Property.NodeScope); private final MeanMetric totalExecutionsTime = new MeanMetric(); private final Map actionByTypeExecutionTime = new HashMap<>(); + private final TimeValue defaultThrottlePeriod; + private final TimeValue maxStopTimeout; + private final TimeValue indexDefaultTimeout; + private final HistoryStore historyStore; private final TriggeredWatchStore triggeredWatchStore; - private final WatchExecutor executor; private final Clock clock; - private final TimeValue defaultThrottlePeriod; - private final TimeValue maxStopTimeout; private final WatchParser parser; private final ClusterService clusterService; private final Client client; - private final TimeValue indexDefaultTimeout; + private final WatchExecutor executor; + private final ExecutorService genericExecutor; - private volatile CurrentExecutions currentExecutions; - private final AtomicBoolean started = new AtomicBoolean(false); + private AtomicReference currentExecutions = new AtomicReference<>(); + private final AtomicBoolean paused = new AtomicBoolean(false); public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor, - Clock clock, WatchParser parser, ClusterService clusterService, Client client) { + Clock clock, WatchParser parser, ClusterService clusterService, Client client, + ExecutorService genericExecutor) { super(settings); this.historyStore = historyStore; this.triggeredWatchStore = triggeredWatchStore; @@ -106,52 +111,21 @@ public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredW this.parser = parser; this.clusterService = clusterService; this.client = client; + this.genericExecutor = genericExecutor; this.indexDefaultTimeout = settings.getAsTime("xpack.watcher.internal.ops.index.default_timeout", TimeValue.timeValueSeconds(30)); + this.currentExecutions.set(new CurrentExecutions()); } - public synchronized void start() throws Exception { - if (started.get()) { - return; - } - - assert executor.queue().isEmpty() : "queue should be empty, but contains " + executor.queue().size() + " elements."; - if (started.compareAndSet(false, true)) { - try { - logger.debug("starting execution service"); - historyStore.start(); - triggeredWatchStore.start(); - currentExecutions = new CurrentExecutions(); - logger.debug("started execution service"); - } catch (Exception e) { - started.set(false); - throw e; - } - } - } - - public boolean validate(ClusterState state) { - return triggeredWatchStore.validate(state) && HistoryStore.validate(state); - } - - public synchronized void stop() { - if (started.compareAndSet(true, false)) { - logger.debug("stopping execution service"); - // We could also rely on the shutdown in #updateSettings call, but - // this is a forceful shutdown that also interrupts the worker threads in the thread pool - int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>()); - - this.clearExecutions(); - triggeredWatchStore.stop(); - historyStore.stop(); - logger.debug("stopped execution service, cancelled [{}] queued tasks", cancelledTaskCount); - } + public synchronized void unPause() { + paused.set(false); } /** * Pause the execution of the watcher executor * @return the number of tasks that have been removed */ - public synchronized int pauseExecution() { + public int pause() { + paused.set(true); int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>()); this.clearExecutions(); return cancelledTaskCount; @@ -171,12 +145,12 @@ public long executionThreadPoolMaxSize() { // for testing only CurrentExecutions getCurrentExecutions() { - return currentExecutions; + return currentExecutions.get(); } public List currentExecutions() { List currentExecutions = new ArrayList<>(); - for (WatchExecution watchExecution : this.currentExecutions) { + for (WatchExecution watchExecution : this.currentExecutions.get()) { currentExecutions.add(watchExecution.createSnapshot()); } // Lets show the longest running watch first: @@ -203,26 +177,28 @@ public List queuedWatches() { } void processEventsAsync(Iterable events) throws Exception { - if (!started.get()) { - throw new IllegalStateException("not started"); + if (paused.get()) { + logger.debug("watcher execution service paused, not processing [{}] events", Iterables.size(events)); + return; } Tuple, List> watchesAndContext = createTriggeredWatchesAndContext(events); List triggeredWatches = watchesAndContext.v1(); triggeredWatchStore.putAll(triggeredWatches, ActionListener.wrap( - response -> executeTriggeredWatches(response, watchesAndContext), - e -> { - Throwable cause = ExceptionsHelper.unwrapCause(e); - if (cause instanceof EsRejectedExecutionException) { - logger.debug("failed to store watch records due to filled up watcher threadpool"); - } else { - logger.warn("failed to store watch records", e); - } - })); + response -> executeTriggeredWatches(response, watchesAndContext), + e -> { + Throwable cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof EsRejectedExecutionException) { + logger.debug("failed to store watch records due to filled up watcher threadpool"); + } else { + logger.warn("failed to store watch records", e); + } + })); } void processEventsSync(Iterable events) throws IOException { - if (!started.get()) { - throw new IllegalStateException("not started"); + if (paused.get()) { + logger.debug("watcher execution service paused, not processing [{}] events", Iterables.size(events)); + return; } Tuple, List> watchesAndContext = createTriggeredWatchesAndContext(events); List triggeredWatches = watchesAndContext.v1(); @@ -279,7 +255,7 @@ public WatchRecord execute(WatchExecutionContext ctx) { WatchRecord record = null; final String watchId = ctx.id().watchId(); try { - boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread())); + boolean executionAlreadyExists = currentExecutions.get().put(watchId, new WatchExecution(ctx, Thread.currentThread())); if (executionAlreadyExists) { logger.trace("not executing watch [{}] because it is already queued", watchId); record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool"); @@ -336,7 +312,7 @@ record = createWatchRecord(record, ctx, e); logger.error((Supplier) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e); } } - currentExecutions.remove(watchId); + currentExecutions.get().remove(watchId); logger.debug("finished [{}]/[{}]", watchId, ctx.id()); } return record; @@ -353,14 +329,14 @@ public void updateWatchStatus(Watch watch) throws IOException { // so we just need to update the watch itself // we do not want to update the status.state field, as it might have been deactivated inbetween Map parameters = MapBuilder.newMapBuilder() - .put(Watch.INCLUDE_STATUS_KEY, "true") - .put(WatchStatus.INCLUDE_STATE, "false") - .immutableMap(); + .put(Watch.INCLUDE_STATUS_KEY, "true") + .put(WatchStatus.INCLUDE_STATE, "false") + .immutableMap(); ToXContent.MapParams params = new ToXContent.MapParams(parameters); XContentBuilder source = JsonXContent.contentBuilder(). - startObject() - .field(WatchField.STATUS.getPreferredName(), watch.status(), params) - .endObject(); + startObject() + .field(WatchField.STATUS.getPreferredName(), watch.status(), params) + .endObject(); UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, watch.id()); updateRequest.doc(source); @@ -400,7 +376,6 @@ private void logWatchRecord(WatchExecutionContext ctx, Exception e) { The execution of an watch is split into two phases: 1. the trigger part which just makes sure to store the associated watch record in the history 2. the actual processing of the watch - The reason this split is that we don't want to lose the fact watch was triggered. This way, even if the thread pool that executes the watches is completely busy, we don't lose the fact that the watch was triggered (it'll have its history record) @@ -419,16 +394,16 @@ private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch trigge } } catch (Exception exc) { logger.error((Supplier) () -> - new ParameterizedMessage("Error storing watch history record for watch [{}] after thread pool rejection", - triggeredWatch.id()), exc); + new ParameterizedMessage("Error storing watch history record for watch [{}] after thread pool rejection", + triggeredWatch.id()), exc); } try { triggeredWatchStore.delete(triggeredWatch.id()); } catch (Exception exc) { logger.error((Supplier) () -> - new ParameterizedMessage("Error deleting triggered watch store record for watch [{}] after thread pool " + - "rejection", triggeredWatch.id()), exc); + new ParameterizedMessage("Error deleting triggered watch store record for watch [{}] after thread pool " + + "rejection", triggeredWatch.id()), exc); } }; } @@ -494,15 +469,15 @@ public void executeTriggeredWatches(Collection triggeredWatches) GetResponse response = getWatch(triggeredWatch.id().watchId()); if (response.isExists() == false) { String message = "unable to find watch for record [" + triggeredWatch.id().watchId() + "]/[" + triggeredWatch.id() + - "], perhaps it has been deleted, ignoring..."; + "], perhaps it has been deleted, ignoring..."; WatchRecord record = new WatchRecord.MessageWatchRecord(triggeredWatch.id(), triggeredWatch.triggerEvent(), - ExecutionState.NOT_EXECUTED_WATCH_MISSING, message, clusterService.localNode().getId()); + ExecutionState.NOT_EXECUTED_WATCH_MISSING, message, clusterService.localNode().getId()); historyStore.forcePut(record); triggeredWatchStore.delete(triggeredWatch.id()); } else { DateTime now = new DateTime(clock.millis(), UTC); TriggeredExecutionContext ctx = new TriggeredExecutionContext(triggeredWatch.id().watchId(), now, - triggeredWatch.triggerEvent(), defaultThrottlePeriod, true); + triggeredWatch.triggerEvent(), defaultThrottlePeriod, true); executeAsync(ctx, triggeredWatch); counter++; } @@ -541,9 +516,10 @@ public Counters executionTimes() { * This clears out the current executions and sets new empty current executions * This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea */ - public synchronized void clearExecutions() { - currentExecutions.sealAndAwaitEmpty(maxStopTimeout); - currentExecutions = new CurrentExecutions(); + private void clearExecutions() { + final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions()); + // clear old executions in background, no need to wait + genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout)); } // the watch execution task takes another runnable as parameter diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java index 35bc805fc59b8..e0164b5bdbd54 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java @@ -5,8 +5,6 @@ */ package org.elasticsearch.xpack.watcher.execution; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -24,7 +22,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -46,13 +43,11 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; -import static org.elasticsearch.xpack.core.watcher.support.Exceptions.illegalState; public class TriggeredWatchStore extends AbstractComponent { @@ -61,7 +56,6 @@ public class TriggeredWatchStore extends AbstractComponent { private final TimeValue scrollTimeout; private final TriggeredWatch.Parser triggeredWatchParser; - private final AtomicBoolean started = new AtomicBoolean(false); private final TimeValue defaultBulkTimeout; private final TimeValue defaultSearchTimeout; @@ -73,36 +67,12 @@ public TriggeredWatchStore(Settings settings, Client client, TriggeredWatch.Pars this.defaultBulkTimeout = settings.getAsTime("xpack.watcher.internal.ops.bulk.default_timeout", TimeValue.timeValueSeconds(120)); this.defaultSearchTimeout = settings.getAsTime("xpack.watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30)); this.triggeredWatchParser = triggeredWatchParser; - this.started.set(true); } - public void start() { - started.set(true); - } - - public boolean validate(ClusterState state) { - try { - IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, state.metaData()); - if (indexMetaData == null) { - return true; - } else { - if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { - logger.debug("triggered watch index [{}] is marked as closed, watcher cannot be started", - indexMetaData.getIndex().getName()); - return false; - } else { - return state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive(); - } - } - } catch (IllegalStateException e) { - logger.trace((Supplier) () -> new ParameterizedMessage("error getting index meta data [{}]: ", - TriggeredWatchStoreField.INDEX_NAME), e); - return false; - } - } - - public void stop() { - started.set(false); + public static boolean validate(ClusterState state) { + IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, state.metaData()); + return indexMetaData == null || (indexMetaData.getState() == IndexMetaData.State.OPEN && + state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive()); } public void putAll(final List triggeredWatches, final ActionListener listener) throws IOException { @@ -111,9 +81,8 @@ public void putAll(final List triggeredWatches, final ActionList return; } - ensureStarted(); executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, createBulkRequest(triggeredWatches, - TriggeredWatchStoreField.DOC_TYPE), listener, client::bulk); + TriggeredWatchStoreField.DOC_TYPE), listener, client::bulk); } public BulkResponse putAll(final List triggeredWatches) throws IOException { @@ -144,7 +113,6 @@ private BulkRequest createBulkRequest(final List triggeredWatche } public void delete(Wid wid) { - ensureStarted(); DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE, wid.value()); try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { client.delete(request); // FIXME shouldn't we wait before saying the delete was successful @@ -152,12 +120,6 @@ public void delete(Wid wid) { logger.trace("successfully deleted triggered watch with id [{}]", wid); } - private void ensureStarted() { - if (!started.get()) { - throw illegalState("unable to persist triggered watches, the store is not ready"); - } - } - /** * Checks if any of the loaded watches has been put into the triggered watches index for immediate execution * @@ -180,7 +142,7 @@ public Collection findTriggeredWatches(Collection watches try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { client.admin().indices().refresh(new RefreshRequest(TriggeredWatchStoreField.INDEX_NAME)) - .actionGet(TimeValue.timeValueSeconds(5)); + .actionGet(TimeValue.timeValueSeconds(5)); } catch (IndexNotFoundException e) { return Collections.emptyList(); } @@ -189,12 +151,12 @@ public Collection findTriggeredWatches(Collection watches Collection triggeredWatches = new ArrayList<>(ids.size()); SearchRequest searchRequest = new SearchRequest(TriggeredWatchStoreField.INDEX_NAME) - .scroll(scrollTimeout) - .preference(Preference.LOCAL.toString()) - .source(new SearchSourceBuilder() - .size(scrollSize) - .sort(SortBuilders.fieldSort("_doc")) - .version(true)); + .scroll(scrollTimeout) + .preference(Preference.LOCAL.toString()) + .source(new SearchSourceBuilder() + .size(scrollSize) + .sort(SortBuilders.fieldSort("_doc")) + .version(true)); SearchResponse response = null; try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java index d226917c57459..64e909a2f73d8 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java @@ -29,7 +29,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -38,7 +37,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; import static org.elasticsearch.xpack.core.watcher.support.Exceptions.ioException; -public class HistoryStore extends AbstractComponent { +public class HistoryStore extends AbstractComponent implements AutoCloseable { public static final String DOC_TYPE = "doc"; @@ -47,24 +46,17 @@ public class HistoryStore extends AbstractComponent { private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final Lock putUpdateLock = readWriteLock.readLock(); private final Lock stopLock = readWriteLock.writeLock(); - private final AtomicBoolean started = new AtomicBoolean(false); public HistoryStore(Settings settings, Client client) { super(settings); this.client = client; } - public void start() { - started.set(true); - } - - public void stop() { - stopLock.lock(); //This will block while put or update actions are underway - try { - started.set(false); - } finally { - stopLock.unlock(); - } + @Override + public void close() { + // This will block while put or update actions are underway + stopLock.lock(); + stopLock.unlock(); } /** @@ -72,9 +64,6 @@ public void stop() { * If the specified watchRecord already was stored this call will fail with a version conflict. */ public void put(WatchRecord watchRecord) throws Exception { - if (!started.get()) { - throw new IllegalStateException("unable to persist watch record history store is not ready"); - } String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); putUpdateLock.lock(); try (XContentBuilder builder = XContentFactory.jsonBuilder(); @@ -82,8 +71,8 @@ public void put(WatchRecord watchRecord) throws Exception { watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS); IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()) - .source(builder) - .opType(IndexRequest.OpType.CREATE); + .source(builder) + .opType(IndexRequest.OpType.CREATE); client.index(request).actionGet(30, TimeUnit.SECONDS); logger.debug("indexed watch history record [{}]", watchRecord.id().value()); } catch (IOException ioe) { @@ -98,9 +87,6 @@ public void put(WatchRecord watchRecord) throws Exception { * Any existing watchRecord will be overwritten. */ public void forcePut(WatchRecord watchRecord) { - if (!started.get()) { - throw new IllegalStateException("unable to persist watch record history store is not ready"); - } String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); putUpdateLock.lock(); try { @@ -109,17 +95,17 @@ public void forcePut(WatchRecord watchRecord) { watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS); IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()) - .source(builder) - .opType(IndexRequest.OpType.CREATE); + .source(builder) + .opType(IndexRequest.OpType.CREATE); client.index(request).get(30, TimeUnit.SECONDS); logger.debug("indexed watch history record [{}]", watchRecord.id().value()); } catch (VersionConflictEngineException vcee) { watchRecord = new WatchRecord.MessageWatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES, - "watch record [{ " + watchRecord.id() + " }] has been stored before, previous state [" + watchRecord.state() + "]"); + "watch record [{ " + watchRecord.id() + " }] has been stored before, previous state [" + watchRecord.state() + "]"); try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(); ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()) - .source(xContentBuilder.value(watchRecord)); + .source(xContentBuilder.value(watchRecord)); client.index(request).get(30, TimeUnit.SECONDS); } logger.debug("overwrote watch history record [{}]", watchRecord.id().value()); @@ -142,11 +128,7 @@ public void forcePut(WatchRecord watchRecord) { public static boolean validate(ClusterState state) { String currentIndex = HistoryStoreField.getHistoryIndexNameForTime(DateTime.now(DateTimeZone.UTC)); IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(currentIndex, state.metaData()); - if (indexMetaData == null) { - return true; - } else { - return indexMetaData.getState() == IndexMetaData.State.OPEN && - state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive(); - } + return indexMetaData == null || (indexMetaData.getState() == IndexMetaData.State.OPEN && + state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive()); } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java index d7f8962756b7c..474057031e7da 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction; import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse; +import org.elasticsearch.xpack.watcher.WatcherLifeCycleService; import org.elasticsearch.xpack.watcher.WatcherService; import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.trigger.TriggerService; @@ -32,19 +33,19 @@ public class TransportWatcherStatsAction extends TransportNodesAction { - private final WatcherService watcherService; private final ExecutionService executionService; private final TriggerService triggerService; + private final WatcherLifeCycleService lifeCycleService; @Inject public TransportWatcherStatsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, WatcherService watcherService, + IndexNameExpressionResolver indexNameExpressionResolver, WatcherLifeCycleService lifeCycleService, ExecutionService executionService, TriggerService triggerService) { super(settings, WatcherStatsAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, WatcherStatsRequest::new, WatcherStatsRequest.Node::new, ThreadPool.Names.MANAGEMENT, WatcherStatsResponse.Node.class); - this.watcherService = watcherService; + this.lifeCycleService = lifeCycleService; this.executionService = executionService; this.triggerService = triggerService; } @@ -68,7 +69,7 @@ protected WatcherStatsResponse.Node newNodeResponse() { @Override protected WatcherStatsResponse.Node nodeOperation(WatcherStatsRequest.Node request) { WatcherStatsResponse.Node statsResponse = new WatcherStatsResponse.Node(clusterService.localNode()); - statsResponse.setWatcherState(watcherService.state()); + statsResponse.setWatcherState(lifeCycleService.getState()); statsResponse.setThreadPoolQueueSize(executionService.executionThreadPoolQueueSize()); statsResponse.setThreadPoolMaxSize(executionService.executionThreadPoolMaxSize()); if (request.includeCurrentWatches()) { @@ -92,4 +93,4 @@ private WatcherMetaData getWatcherMetaData() { } return watcherMetaData; } -} \ No newline at end of file +} diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java index 7e08f140dafae..de8ab1d1f4bc6 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.Node; import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule; @@ -32,7 +33,7 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine { public static final Setting TICKER_INTERVAL_SETTING = - positiveTimeSetting("xpack.watcher.trigger.schedule.ticker.tick_interval", TimeValue.timeValueMillis(500), Property.NodeScope); + positiveTimeSetting("xpack.watcher.trigger.schedule.ticker.tick_interval", TimeValue.timeValueMillis(500), Property.NodeScope); private final TimeValue tickInterval; private volatile Map schedules; @@ -42,26 +43,31 @@ public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleR super(settings, scheduleRegistry, clock); this.tickInterval = TICKER_INTERVAL_SETTING.get(settings); this.schedules = new ConcurrentHashMap<>(); + this.ticker = new Ticker(Node.NODE_DATA_SETTING.get(settings)); } @Override - public void start(Collection jobs) { - long starTime = clock.millis(); + public synchronized void start(Collection jobs) { + long startTime = clock.millis(); Map schedules = new ConcurrentHashMap<>(); for (Watch job : jobs) { if (job.trigger() instanceof ScheduleTrigger) { ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); - schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), starTime)); + schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime)); } } - this.schedules = schedules; - this.ticker = new Ticker(); + this.schedules.putAll(schedules); } @Override public void stop() { + schedules.clear(); ticker.close(); - pauseExecution(); + } + + @Override + public synchronized void pauseExecution() { + schedules.clear(); } @Override @@ -71,11 +77,6 @@ public void add(Watch watch) { schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis())); } - @Override - public void pauseExecution() { - schedules.clear(); - } - @Override public int getJobCount() { return schedules.size(); @@ -93,9 +94,9 @@ void checkJobs() { long scheduledTime = schedule.check(triggeredTime); if (scheduledTime > 0) { logger.debug("triggered job [{}] at [{}] (scheduled time was [{}])", schedule.name, - new DateTime(triggeredTime, UTC), new DateTime(scheduledTime, UTC)); + new DateTime(triggeredTime, UTC), new DateTime(scheduledTime, UTC)); events.add(new ScheduleTriggerEvent(schedule.name, new DateTime(triggeredTime, UTC), - new DateTime(scheduledTime, UTC))); + new DateTime(scheduledTime, UTC))); if (events.size() >= 1000) { notifyListeners(events); events.clear(); @@ -145,11 +146,15 @@ class Ticker extends Thread { private volatile boolean active = true; private final CountDownLatch closeLatch = new CountDownLatch(1); + private boolean isDataNode; - Ticker() { + Ticker(boolean isDataNode) { super("ticker-schedule-trigger-engine"); + this.isDataNode = isDataNode; setDaemon(true); - start(); + if (isDataNode) { + start(); + } } @Override @@ -167,15 +172,17 @@ public void run() { } public void close() { - logger.trace("stopping ticker thread"); - active = false; - try { - closeLatch.await(); - } catch (InterruptedException e) { - logger.warn("caught an interrupted exception when waiting while closing ticker thread", e); - Thread.currentThread().interrupt(); + if (isDataNode) { + logger.trace("stopping ticker thread"); + active = false; + try { + closeLatch.await(); + } catch (InterruptedException e) { + logger.warn("caught an interrupted exception when waiting while closing ticker thread", e); + Thread.currentThread().interrupt(); + } + logger.trace("ticker thread stopped"); } - logger.trace("ticker thread stopped"); } } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index 86375c0ea4862..316cb722f2f1e 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.watcher; -import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.Version; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -25,16 +24,13 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.watcher.WatcherMetaData; import org.elasticsearch.xpack.core.watcher.WatcherState; -import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.junit.Before; import org.mockito.stubbing.Answer; @@ -52,6 +48,7 @@ import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.TRIGGERED_TEMPLATE_NAME; import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.WATCHES_TEMPLATE_NAME; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; @@ -59,8 +56,11 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; public class WatcherLifeCycleServiceTests extends ESTestCase { @@ -70,8 +70,6 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { @Before public void prepareServices() { - ThreadPool threadPool = mock(ThreadPool.class); - when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); ClusterService clusterService = mock(ClusterService.class); Answer answer = invocationOnMock -> { AckedClusterStateUpdateTask updateTask = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; @@ -80,276 +78,233 @@ public void prepareServices() { }; doAnswer(answer).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class)); watcherService = mock(WatcherService.class); - lifeCycleService = new WatcherLifeCycleService(Settings.EMPTY, clusterService, watcherService, - EsExecutors.newDirectExecutorService()) { - @Override - void stopExecutor() { - // direct executor cannot be terminated - } - }; + lifeCycleService = new WatcherLifeCycleService(Settings.EMPTY, clusterService, watcherService); } - public void testStartAndStopCausedByClusterState() throws Exception { + public void testNoRestartWithoutAllocationIdsConfigured() { IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(new Index("anything", "foo")).build(); ClusterState previousClusterState = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) - .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) - .build(); + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) + .build(); IndexRoutingTable watchRoutingTable = IndexRoutingTable.builder(new Index(Watch.INDEX, "foo")).build(); ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) - .metaData(MetaData.builder() - .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .build()) - .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) - .routingTable(RoutingTable.builder().add(watchRoutingTable).build()) - .build(); - - when(watcherService.state()).thenReturn(WatcherState.STOPPED); + .metaData(MetaData.builder() + .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .build()) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) + .routingTable(RoutingTable.builder().add(watchRoutingTable).build()) + .build(); + when(watcherService.validate(clusterState)).thenReturn(true); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, previousClusterState)); - verify(watcherService, times(1)).start(clusterState); - verify(watcherService, never()).stop(anyString()); + verifyZeroInteractions(watcherService); - // Trying to start a second time, but that should have no affect. - when(watcherService.state()).thenReturn(WatcherState.STARTED); + // Trying to start a second time, but that should have no effect. lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, previousClusterState)); - verify(watcherService, times(1)).start(clusterState); - verify(watcherService, never()).stop(anyString()); + verifyZeroInteractions(watcherService); } - public void testStartWithStateNotRecoveredBlock() throws Exception { + public void testStartWithStateNotRecoveredBlock() { DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1"); ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) - .blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) - .nodes(nodes).build(); - when(watcherService.state()).thenReturn(WatcherState.STOPPED); + .blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) + .nodes(nodes).build(); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); - verify(watcherService, never()).start(any(ClusterState.class)); + verifyZeroInteractions(watcherService); } - public void testShutdown() throws Exception { + public void testShutdown() { IndexRoutingTable watchRoutingTable = IndexRoutingTable.builder(new Index(Watch.INDEX, "foo")).build(); ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) - .routingTable(RoutingTable.builder().add(watchRoutingTable).build()) - .metaData(MetaData.builder() - .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .build()) - .build(); + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) + .routingTable(RoutingTable.builder().add(watchRoutingTable).build()) + .metaData(MetaData.builder() + .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .build()) + .build(); when(watcherService.validate(clusterState)).thenReturn(true); - when(watcherService.state()).thenReturn(WatcherState.STOPPED); - lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, clusterState)); - verify(watcherService, times(1)).start(any(ClusterState.class)); - verify(watcherService, never()).stop(anyString()); - - when(watcherService.state()).thenReturn(WatcherState.STARTED); lifeCycleService.shutDown(); - verify(watcherService, times(1)).start(any(ClusterState.class)); - verify(watcherService, times(1)).stop(eq("shutdown initiated")); + verify(watcherService, never()).stop(anyString()); + verify(watcherService, times(1)).shutDown(); - when(watcherService.state()).thenReturn(WatcherState.STOPPED); + reset(watcherService); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); - verify(watcherService, times(1)).start(any(ClusterState.class)); - verify(watcherService, times(1)).stop(eq("shutdown initiated")); + verifyZeroInteractions(watcherService); } - public void testManualStartStop() throws Exception { - IndexRoutingTable watchRoutingTable = IndexRoutingTable.builder(new Index(Watch.INDEX, "foo")).build(); + public void testManualStartStop() { + Index index = new Index(Watch.INDEX, "uuid"); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addShard( + TestShardRouting.newShardRouting(Watch.INDEX, 0, "node_1", true, ShardRoutingState.STARTED)); + IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(Watch.INDEX).settings(settings(Version.CURRENT) + .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required + .numberOfShards(1).numberOfReplicas(0); + MetaData.Builder metaDataBuilder = MetaData.builder() + .put(indexMetaDataBuilder) + .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())); + if (randomBoolean()) { + metaDataBuilder.putCustom(WatcherMetaData.TYPE, new WatcherMetaData(false)); + } + MetaData metaData = metaDataBuilder.build(); + IndexRoutingTable indexRoutingTable = indexRoutingTableBuilder.build(); ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) - .routingTable(RoutingTable.builder().add(watchRoutingTable).build()) - .metaData(MetaData.builder() - .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .build()) - .build(); + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) + .metaData(metaData) + .build(); when(watcherService.validate(clusterState)).thenReturn(true); - when(watcherService.state()).thenReturn(WatcherState.STOPPED); - lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, clusterState)); - verify(watcherService, times(1)).start(any(ClusterState.class)); - verify(watcherService, never()).stop(anyString()); + // mark watcher manually as stopped + ClusterState stoppedClusterState = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) + .metaData(MetaData.builder(metaData).putCustom(WatcherMetaData.TYPE, new WatcherMetaData(true)).build()) + .build(); - when(watcherService.state()).thenReturn(WatcherState.STARTED); - String reason = randomAlphaOfLength(10); - lifeCycleService.stop(reason); - verify(watcherService, times(1)).start(any(ClusterState.class)); - verify(watcherService, times(1)).stop(eq(reason)); + lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stoppedClusterState, clusterState)); + verify(watcherService, times(1)).stop(eq("watcher manually marked to shutdown by cluster state update")); - // Starting via cluster state update, we shouldn't start because we have been stopped manually. - when(watcherService.state()).thenReturn(WatcherState.STOPPED); - lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); - verify(watcherService, times(2)).start(any(ClusterState.class)); - verify(watcherService, times(1)).stop(eq(reason)); + // Starting via cluster state update, as the watcher metadata block is removed/set to true + reset(watcherService); + when(watcherService.validate(clusterState)).thenReturn(true); + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, stoppedClusterState)); + verify(watcherService, times(1)).start(eq(clusterState)); // no change, keep going - clusterState = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) - .metaData(MetaData.builder() - .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .build()) - .build(); - when(watcherService.state()).thenReturn(WatcherState.STARTED); + reset(watcherService); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); - verify(watcherService, times(2)).start(any(ClusterState.class)); - verify(watcherService, times(1)).stop(eq(reason)); - - ClusterState previousClusterState = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) - .metaData(MetaData.builder() - .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .build()) - .build(); - when(watcherService.validate(clusterState)).thenReturn(true); - when(watcherService.state()).thenReturn(WatcherState.STOPPED); - lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, previousClusterState)); - verify(watcherService, times(3)).start(any(ClusterState.class)); - verify(watcherService, times(1)).stop(eq(reason)); + verifyZeroInteractions(watcherService); } - public void testManualStartStopClusterStateNotValid() throws Exception { - DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1"); - ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(nodes).build(); - when(watcherService.state()).thenReturn(WatcherState.STOPPED); - when(watcherService.validate(clusterState)).thenReturn(false); - - lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, clusterState)); - - verify(watcherService, never()).start(any(ClusterState.class)); - verify(watcherService, never()).stop(anyString()); - } - - public void testManualStartStopWatcherNotStopped() throws Exception { - DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1"); - ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(nodes).build(); - when(watcherService.state()).thenReturn(WatcherState.STOPPING); - - lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, clusterState)); - verify(watcherService, never()).validate(any(ClusterState.class)); - verify(watcherService, never()).start(any(ClusterState.class)); - verify(watcherService, never()).stop(anyString()); - } - - public void testNoLocalShards() throws Exception { + public void testNoLocalShards() { Index watchIndex = new Index(Watch.INDEX, "foo"); ShardId shardId = new ShardId(watchIndex, 0); DiscoveryNodes nodes = new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") - .add(newNode("node_1")).add(newNode("node_2")) - .build(); + .add(newNode("node_1")).add(newNode("node_2")) + .build(); IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX) - .settings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - ).build(); + .settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6) + ).build(); IndexRoutingTable watchRoutingTable = IndexRoutingTable.builder(watchIndex) - .addShard(randomBoolean() ? - TestShardRouting.newShardRouting(shardId, "node_1", true, STARTED) : - TestShardRouting.newShardRouting(shardId, "node_1", "node_2", true, RELOCATING)) - .build(); + .addShard(randomBoolean() ? + TestShardRouting.newShardRouting(shardId, "node_1", true, STARTED) : + TestShardRouting.newShardRouting(shardId, "node_1", "node_2", true, RELOCATING)) + .build(); ClusterState clusterStateWithLocalShards = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(nodes) - .routingTable(RoutingTable.builder().add(watchRoutingTable).build()) - .metaData(MetaData.builder().put(indexMetaData, false)) - .build(); + .nodes(nodes) + .routingTable(RoutingTable.builder().add(watchRoutingTable).build()) + .metaData(MetaData.builder().put(indexMetaData, false)) + .build(); // shard moved over to node 2 IndexRoutingTable watchRoutingTableNode2 = IndexRoutingTable.builder(watchIndex) - .addShard(randomBoolean() ? - TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED) : - TestShardRouting.newShardRouting(shardId, "node_2", "node_1", true, RELOCATING)) - .build(); + .addShard(randomBoolean() ? + TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED) : + TestShardRouting.newShardRouting(shardId, "node_2", "node_1", true, RELOCATING)) + .build(); ClusterState clusterStateWithoutLocalShards = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(nodes) - .routingTable(RoutingTable.builder().add(watchRoutingTableNode2).build()) - .metaData(MetaData.builder().put(indexMetaData, false)) - .build(); - - when(watcherService.state()).thenReturn(WatcherState.STARTED); + .nodes(nodes) + .routingTable(RoutingTable.builder().add(watchRoutingTableNode2).build()) + .metaData(MetaData.builder().put(indexMetaData, false)) + .build(); // set current allocation ids + when(watcherService.validate(eq(clusterStateWithLocalShards))).thenReturn(true); + when(watcherService.validate(eq(clusterStateWithoutLocalShards))).thenReturn(false); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithLocalShards, clusterStateWithoutLocalShards)); - verify(watcherService, times(0)).pauseExecution(eq("no local watcher shards found")); + verify(watcherService, times(1)).reload(eq(clusterStateWithLocalShards), eq("new local watcher shard allocation ids")); + verify(watcherService, times(1)).validate(eq(clusterStateWithLocalShards)); + verifyNoMoreInteractions(watcherService); - // no more local hards, lets pause execution + // no more local shards, lets pause execution + reset(watcherService); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithoutLocalShards, clusterStateWithLocalShards)); verify(watcherService, times(1)).pauseExecution(eq("no local watcher shards found")); + verifyNoMoreInteractions(watcherService); // no further invocations should happen if the cluster state does not change in regard to local shards + reset(watcherService); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithoutLocalShards, clusterStateWithoutLocalShards)); - verify(watcherService, times(1)).pauseExecution(eq("no local watcher shards found")); + verifyZeroInteractions(watcherService); } - public void testReplicaWasAddedOrRemoved() throws Exception { + public void testReplicaWasAddedOrRemoved() { Index watchIndex = new Index(Watch.INDEX, "foo"); ShardId shardId = new ShardId(watchIndex, 0); ShardId secondShardId = new ShardId(watchIndex, 1); DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") - .add(newNode("node_1")) - .add(newNode("node_2")) - .build(); + .add(newNode("node_1")) + .add(newNode("node_2")) + .build(); IndexRoutingTable previousWatchRoutingTable = IndexRoutingTable.builder(watchIndex) - .addShard(TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED)) - .addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED)) - .build(); + .addShard(TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED)) + .addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED)) + .build(); IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX) - .settings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - ).build(); + .settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6) + ).build(); ClusterState stateWithPrimaryShard = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(discoveryNodes) - .routingTable(RoutingTable.builder().add(previousWatchRoutingTable).build()) - .metaData(MetaData.builder().put(indexMetaData, false)) - .build(); + .nodes(discoveryNodes) + .routingTable(RoutingTable.builder().add(previousWatchRoutingTable).build()) + .metaData(MetaData.builder().put(indexMetaData, false)) + .build(); IndexRoutingTable currentWatchRoutingTable = IndexRoutingTable.builder(watchIndex) - .addShard(TestShardRouting.newShardRouting(shardId, "node_1", false, STARTED)) - .addShard(TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED)) - .addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED)) - .build(); + .addShard(TestShardRouting.newShardRouting(shardId, "node_1", false, STARTED)) + .addShard(TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED)) + .addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED)) + .build(); ClusterState stateWithReplicaAdded = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(discoveryNodes) - .routingTable(RoutingTable.builder().add(currentWatchRoutingTable).build()) - .metaData(MetaData.builder().put(indexMetaData, false)) - .build(); + .nodes(discoveryNodes) + .routingTable(RoutingTable.builder().add(currentWatchRoutingTable).build()) + .metaData(MetaData.builder().put(indexMetaData, false)) + .build(); // randomize between addition or removal of a replica boolean replicaAdded = randomBoolean(); - ClusterChangedEvent event; - ClusterState usedClusterState; + ClusterChangedEvent firstEvent; + ClusterChangedEvent secondEvent; if (replicaAdded) { - event = new ClusterChangedEvent("any", stateWithReplicaAdded, stateWithPrimaryShard); - usedClusterState = stateWithReplicaAdded; + firstEvent = new ClusterChangedEvent("any", stateWithPrimaryShard, stateWithReplicaAdded); + secondEvent = new ClusterChangedEvent("any", stateWithReplicaAdded, stateWithPrimaryShard); } else { - event = new ClusterChangedEvent("any", stateWithPrimaryShard, stateWithReplicaAdded); - usedClusterState = stateWithPrimaryShard; + firstEvent = new ClusterChangedEvent("any", stateWithReplicaAdded, stateWithPrimaryShard); + secondEvent = new ClusterChangedEvent("any", stateWithPrimaryShard, stateWithReplicaAdded); } - when(watcherService.state()).thenReturn(WatcherState.STARTED); - lifeCycleService.clusterChanged(event); - verify(watcherService).reload(eq(usedClusterState), anyString()); + when(watcherService.validate(eq(firstEvent.state()))).thenReturn(true); + lifeCycleService.clusterChanged(firstEvent); + verify(watcherService).reload(eq(firstEvent.state()), anyString()); + + reset(watcherService); + when(watcherService.validate(eq(secondEvent.state()))).thenReturn(true); + lifeCycleService.clusterChanged(secondEvent); + verify(watcherService).reload(eq(secondEvent.state()), anyString()); } // make sure that cluster state changes can be processed on nodes that do not hold data @@ -360,43 +315,42 @@ public void testNonDataNode() { IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index).addShard(shardRouting); DiscoveryNode node1 = new DiscoveryNode("node_1", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), - new HashSet<>(asList(randomFrom(DiscoveryNode.Role.INGEST, DiscoveryNode.Role.MASTER))), Version.CURRENT); + new HashSet<>(asList(randomFrom(DiscoveryNode.Role.INGEST, DiscoveryNode.Role.MASTER))), Version.CURRENT); DiscoveryNode node2 = new DiscoveryNode("node_2", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), - new HashSet<>(asList(DiscoveryNode.Role.DATA)), Version.CURRENT); + new HashSet<>(asList(DiscoveryNode.Role.DATA)), Version.CURRENT); DiscoveryNode node3 = new DiscoveryNode("node_3", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), - new HashSet<>(asList(DiscoveryNode.Role.DATA)), Version.CURRENT); + new HashSet<>(asList(DiscoveryNode.Role.DATA)), Version.CURRENT); IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(Watch.INDEX) - .settings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - ); + .settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + ); ClusterState previousState = ClusterState.builder(new ClusterName("my-cluster")) - .metaData(MetaData.builder().put(indexMetaDataBuilder)) - .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(node1).add(node2).add(node3)) - .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) - .build(); + .metaData(MetaData.builder().put(indexMetaDataBuilder)) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(node1).add(node2).add(node3)) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) + .build(); IndexMetaData.Builder newIndexMetaDataBuilder = IndexMetaData.builder(Watch.INDEX) - .settings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - ); + .settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + ); ShardRouting replicaShardRouting = TestShardRouting.newShardRouting(shardId, "node3", false, STARTED); IndexRoutingTable.Builder newRoutingTable = IndexRoutingTable.builder(index).addShard(shardRouting).addShard(replicaShardRouting); ClusterState currentState = ClusterState.builder(new ClusterName("my-cluster")) - .metaData(MetaData.builder().put(newIndexMetaDataBuilder)) - .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(node1).add(node2).add(node3)) - .routingTable(RoutingTable.builder().add(newRoutingTable).build()) - .build(); + .metaData(MetaData.builder().put(newIndexMetaDataBuilder)) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(node1).add(node2).add(node3)) + .routingTable(RoutingTable.builder().add(newRoutingTable).build()) + .build(); - when(watcherService.state()).thenReturn(WatcherState.STARTED); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", currentState, previousState)); verify(watcherService, times(0)).pauseExecution(anyObject()); verify(watcherService, times(0)).reload(any(), any()); @@ -406,76 +360,48 @@ public void testThatMissingWatcherIndexMetadataOnlyResetsOnce() { Index watchIndex = new Index(Watch.INDEX, "foo"); ShardId shardId = new ShardId(watchIndex, 0); IndexRoutingTable watchRoutingTable = IndexRoutingTable.builder(watchIndex) - .addShard(TestShardRouting.newShardRouting(shardId, "node_1", true, STARTED)).build(); + .addShard(TestShardRouting.newShardRouting(shardId, "node_1", true, STARTED)).build(); DiscoveryNodes nodes = new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")).build(); IndexMetaData.Builder newIndexMetaDataBuilder = IndexMetaData.builder(Watch.INDEX) - .settings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - ); + .settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6) + ); ClusterState clusterStateWithWatcherIndex = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(nodes) - .routingTable(RoutingTable.builder().add(watchRoutingTable).build()) - .metaData(MetaData.builder().put(newIndexMetaDataBuilder)) - .build(); + .nodes(nodes) + .routingTable(RoutingTable.builder().add(watchRoutingTable).build()) + .metaData(MetaData.builder().put(newIndexMetaDataBuilder)) + .build(); ClusterState clusterStateWithoutWatcherIndex = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(nodes) - .build(); + .nodes(nodes) + .build(); - when(watcherService.state()).thenReturn(WatcherState.STARTED); + when(watcherService.validate(eq(clusterStateWithWatcherIndex))).thenReturn(true); + when(watcherService.validate(eq(clusterStateWithoutWatcherIndex))).thenReturn(false); // first add the shard allocation ids, by going from empty cs to CS with watcher index lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithWatcherIndex, clusterStateWithoutWatcherIndex)); + verify(watcherService).reload(eq(clusterStateWithWatcherIndex), anyString()); // now remove watches index, and ensure that pausing is only called once, no matter how often called (i.e. each CS update) lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithoutWatcherIndex, clusterStateWithWatcherIndex)); verify(watcherService, times(1)).pauseExecution(anyObject()); + reset(watcherService); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithoutWatcherIndex, clusterStateWithWatcherIndex)); - verify(watcherService, times(1)).pauseExecution(anyObject()); - } - - public void testWatcherDoesNotStartWithOldIndexFormat() throws Exception { - String index = randomFrom(Watch.INDEX, TriggeredWatchStoreField.INDEX_NAME); - Index watchIndex = new Index(index, "foo"); - ShardId shardId = new ShardId(watchIndex, 0); - IndexRoutingTable watchRoutingTable = IndexRoutingTable.builder(watchIndex) - .addShard(TestShardRouting.newShardRouting(shardId, "node_1", true, STARTED)).build(); - DiscoveryNodes nodes = new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")).build(); - - Settings.Builder indexSettings = Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT); - // no matter if not set or set to one, watcher should not start - if (randomBoolean()) { - indexSettings.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 1); - } - IndexMetaData.Builder newIndexMetaDataBuilder = IndexMetaData.builder(index).settings(indexSettings); - - ClusterState clusterStateWithWatcherIndex = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(nodes) - .routingTable(RoutingTable.builder().add(watchRoutingTable).build()) - .metaData(MetaData.builder().put(newIndexMetaDataBuilder)) - .build(); - - ClusterState emptyClusterState = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build(); - - when(watcherService.state()).thenReturn(WatcherState.STOPPED); - when(watcherService.validate(eq(clusterStateWithWatcherIndex))).thenReturn(true); - lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithWatcherIndex, emptyClusterState)); - verify(watcherService, never()).start(any(ClusterState.class)); + verifyZeroInteractions(watcherService); } public void testWatcherServiceDoesNotStartIfIndexTemplatesAreMissing() throws Exception { DiscoveryNodes nodes = new DiscoveryNodes.Builder() - .masterNodeId("node_1").localNodeId("node_1") - .add(newNode("node_1")) - .build(); + .masterNodeId("node_1").localNodeId("node_1") + .add(newNode("node_1")) + .build(); MetaData.Builder metaDataBuilder = MetaData.builder(); boolean isHistoryTemplateAdded = randomBoolean(); @@ -495,191 +421,103 @@ public void testWatcherServiceDoesNotStartIfIndexTemplatesAreMissing() throws Ex } ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).metaData(metaDataBuilder).build(); when(watcherService.validate(eq(state))).thenReturn(true); - when(watcherService.state()).thenReturn(WatcherState.STOPPED); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state)); verify(watcherService, times(0)).start(any(ClusterState.class)); } public void testWatcherStopsWhenMasterNodeIsMissing() { + startWatcher(); + DiscoveryNodes nodes = new DiscoveryNodes.Builder() - .localNodeId("node_1") - .add(newNode("node_1")) - .build(); + .localNodeId("node_1") + .add(newNode("node_1")) + .build(); ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build(); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state)); - verify(watcherService, times(1)).stop(eq("no master node")); + verify(watcherService, times(1)).pauseExecution(eq("no master node")); } public void testWatcherStopsOnClusterLevelBlock() { + startWatcher(); + DiscoveryNodes nodes = new DiscoveryNodes.Builder() - .localNodeId("node_1") - .masterNodeId("node_1") - .add(newNode("node_1")) - .build(); + .localNodeId("node_1") + .masterNodeId("node_1") + .add(newNode("node_1")) + .build(); ClusterBlocks clusterBlocks = ClusterBlocks.builder().addGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_WRITES).build(); ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).blocks(clusterBlocks).build(); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state)); - verify(watcherService, times(1)).stop(eq("write level cluster block")); + verify(watcherService, times(1)).pauseExecution(eq("write level cluster block")); } - public void testStateIsSetImmediately() throws Exception { - Index index = new Index(Watch.INDEX, "foo"); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addShard( - TestShardRouting.newShardRouting(Watch.INDEX, 0, "node_1", true, ShardRoutingState.STARTED)); - IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(Watch.INDEX).settings(settings(Version.CURRENT) - .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required - .numberOfShards(1).numberOfReplicas(0); + public void testMasterOnlyNodeCanStart() { + List roles = Collections.singletonList(randomFrom(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.INGEST)); ClusterState state = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") - .add(newNode("node_1"))) - .routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build()) - .metaData(MetaData.builder() - .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(indexMetaDataBuilder) - .build()) - .build(); - when(watcherService.validate(state)).thenReturn(true); - when(watcherService.state()).thenReturn(WatcherState.STOPPED); + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") + .add(new DiscoveryNode("node_1", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), + new HashSet<>(roles), Version.CURRENT))).build(); - lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, state)); - verify(watcherService, times(1)).start(eq(state)); - assertThat(lifeCycleService.allocationIds(), hasSize(1)); + lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state)); + assertThat(lifeCycleService.getState(), is(WatcherState.STARTED)); + } - // now do any cluster state upgrade, see that reload gets triggers, but should not - when(watcherService.state()).thenReturn(WatcherState.STARTED); - lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, state)); - verify(watcherService, never()).pauseExecution(anyString()); + public void testDataNodeWithoutDataCanStart() { + MetaData metaData = MetaData.builder().put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .build(); + ClusterState state = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) + .metaData(metaData) + .build(); - verify(watcherService, never()).reload(eq(state), anyString()); - assertThat(lifeCycleService.allocationIds(), hasSize(1)); + lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state)); + assertThat(lifeCycleService.getState(), is(WatcherState.STARTED)); } - public void testWatcherServiceExceptionsAreCaught() { - Index index = new Index(Watch.INDEX, "foo"); + private ClusterState startWatcher() { + Index index = new Index(Watch.INDEX, "uuid"); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); indexRoutingTableBuilder.addShard( - TestShardRouting.newShardRouting(Watch.INDEX, 0, "node_1", true, ShardRoutingState.STARTED)); - IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX).settings(settings(Version.CURRENT) - .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required - .numberOfShards(1).numberOfReplicas(0).build(); - - // special setup for one of the following cluster states - DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); - DiscoveryNode localNode = mock(DiscoveryNode.class); - when(discoveryNodes.getMasterNodeId()).thenReturn("node_1"); - when(discoveryNodes.getLocalNode()).thenReturn(localNode); - when(localNode.isDataNode()).thenReturn(true); - when(localNode.getId()).thenReturn("does_not_exist"); - - ClusterState clusterState = randomFrom( - // cluster state with no watcher index - ClusterState.builder(new ClusterName("my-cluster")) - .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) - .metaData(MetaData.builder() - .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .build()) - .build(), - // cluster state with no routing node - ClusterState.builder(new ClusterName("my-cluster")) - .nodes(discoveryNodes) - .metaData(MetaData.builder() - .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .build()) - .build(), - - // cluster state with no local shards - ClusterState.builder(new ClusterName("my-cluster")) - .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) - .metaData(MetaData.builder() - .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(indexMetaData, true) - .build()) - .build() - ); - - ClusterState stateWithWatcherShards = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") - .add(newNode("node_1"))) - .routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build()) - .metaData(MetaData.builder() - .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(indexMetaData, true) - .build()) - .build(); - - lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stateWithWatcherShards, stateWithWatcherShards)); - - when(watcherService.validate(anyObject())).thenReturn(true); - when(watcherService.state()).thenReturn(WatcherState.STARTED); - doAnswer(invocation -> { - throw new ElasticsearchSecurityException("breakme"); - }).when(watcherService).pauseExecution(anyString()); - - lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, stateWithWatcherShards)); - verify(watcherService, times(1)).pauseExecution(anyString()); - } + TestShardRouting.newShardRouting(Watch.INDEX, 0, "node_1", true, ShardRoutingState.STARTED)); + IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(Watch.INDEX).settings(settings(Version.CURRENT) + .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required + .numberOfShards(1).numberOfReplicas(0); + MetaData metaData = MetaData.builder().put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())).put(indexMetaDataBuilder) + .build(); + ClusterState state = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") + .add(newNode("node_1"))) + .routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build()) + .metaData(metaData) + .build(); + ClusterState emptyState = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") + .add(newNode("node_1"))) + .metaData(metaData) + .build(); - public void testWatcherServiceExceptionsAreCaughtOnReload() { - Index index = new Index(Watch.INDEX, "foo"); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addShard( - TestShardRouting.newShardRouting(Watch.INDEX, 0, "node_1", true, ShardRoutingState.STARTED)); - IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX).settings(settings(Version.CURRENT) - .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required - .numberOfShards(1).numberOfReplicas(0).build(); + when(watcherService.validate(state)).thenReturn(true); - // cluster state with different local shards (another shard id) - ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))).routingTable( - RoutingTable.builder().add(IndexRoutingTable.builder(index) - .addShard(TestShardRouting.newShardRouting(Watch.INDEX, 1, "node_1", true, ShardRoutingState.STARTED)) - .build()).build()).metaData( - MetaData.builder().put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(indexMetaData, true).build()).build(); - - ClusterState stateWithWatcherShards = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") - .add(newNode("node_1"))) - .routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build()) - .metaData(MetaData.builder() - .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) - .put(indexMetaData, true) - .build()) - .build(); - - lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stateWithWatcherShards, stateWithWatcherShards)); - - when(watcherService.validate(anyObject())).thenReturn(true); - when(watcherService.state()).thenReturn(WatcherState.STARTED); - doAnswer(invocation -> { - throw new ElasticsearchSecurityException("breakme"); - }).when(watcherService).reload(eq(clusterState), anyString()); - - lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, stateWithWatcherShards)); - verify(watcherService, times(1)).reload(eq(clusterState), anyString()); + lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState)); + assertThat(lifeCycleService.getState(), is(WatcherState.STARTED)); + verify(watcherService, times(1)).reload(eq(state), anyString()); + assertThat(lifeCycleService.allocationIds(), hasSize(1)); + + // reset the mock, the user has to mock everything themselves again + reset(watcherService); + return state; } private List randomIndexPatterns() { return IntStream.range(0, between(1, 10)) - .mapToObj(n -> randomAlphaOfLengthBetween(1, 100)) - .collect(Collectors.toList()); + .mapToObj(n -> randomAlphaOfLengthBetween(1, 100)) + .collect(Collectors.toList()); } private static DiscoveryNode newNode(String nodeName) { @@ -688,6 +526,6 @@ private static DiscoveryNode newNode(String nodeName) { private static DiscoveryNode newNode(String nodeName, Version version) { return new DiscoveryNode(nodeName, ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), - new HashSet<>(asList(DiscoveryNode.Role.values())), version); + new HashSet<>(asList(DiscoveryNode.Role.values())), version); } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index 0622ab48227fe..92726fb94cd43 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; @@ -55,12 +56,12 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.concurrent.ExecutorService; import static java.util.Arrays.asList; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -68,15 +69,20 @@ public class WatcherServiceTests extends ESTestCase { - public void testValidateStartWithClosedIndex() throws Exception { + private final ExecutorService executorService = EsExecutors.newDirectExecutorService(); + + public void testValidateStartWithClosedIndex() { TriggerService triggerService = mock(TriggerService.class); TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class); ExecutionService executionService = mock(ExecutionService.class); - when(executionService.validate(anyObject())).thenReturn(true); WatchParser parser = mock(WatchParser.class); WatcherService service = new WatcherService(Settings.EMPTY, triggerService, triggeredWatchStore, - executionService, parser, mock(Client.class)); + executionService, parser, mock(Client.class), executorService) { + @Override + void stopExecutor() { + } + }; ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); MetaData.Builder metaDataBuilder = MetaData.builder(); @@ -97,14 +103,17 @@ public void testLoadOnlyActiveWatches() throws Exception { TriggerService triggerService = mock(TriggerService.class); TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class); ExecutionService executionService = mock(ExecutionService.class); - when(executionService.validate(anyObject())).thenReturn(true); WatchParser parser = mock(WatchParser.class); Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); WatcherService service = new WatcherService(settings, triggerService, triggeredWatchStore, - executionService, parser, client); + executionService, parser, client, executorService) { + @Override + void stopExecutor() { + } + }; // cluster state setup, with one node, one shard @@ -199,4 +208,4 @@ private static DiscoveryNode newNode() { return new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 9684c55692f1b..73f0e82072055 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.DeprecationHandler; @@ -147,9 +148,7 @@ public void init() throws Exception { when(clusterService.localNode()).thenReturn(discoveryNode); executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, clock, parser, - clusterService, client); - - executionService.start(); + clusterService, client, EsExecutors.newDirectExecutorService()); } public void testExecute() throws Exception { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java index 07ba254dea9a3..f38f4ad6a86e8 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java @@ -85,9 +85,9 @@ public class TriggeredWatchStoreTests extends ESTestCase { private Settings indexSettings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); private Client client; private TriggeredWatch.Parser parser; @@ -101,10 +101,9 @@ public void init() { when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); parser = mock(TriggeredWatch.Parser.class); triggeredWatchStore = new TriggeredWatchStore(Settings.EMPTY, client, parser); - triggeredWatchStore.start(); } - public void testFindTriggeredWatchesEmptyCollection() throws Exception { + public void testFindTriggeredWatchesEmptyCollection() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("name")); Collection triggeredWatches = triggeredWatchStore.findTriggeredWatches(Collections.emptyList(), csBuilder.build()); assertThat(triggeredWatches, hasSize(0)); @@ -112,10 +111,10 @@ public void testFindTriggeredWatchesEmptyCollection() throws Exception { public void testValidateNoIndex() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("name")); - assertThat(triggeredWatchStore.validate(csBuilder.build()), is(true)); + assertThat(TriggeredWatchStore.validate(csBuilder.build()), is(true)); } - public void testValidateNoActivePrimaryShards() throws Exception { + public void testValidateNoActivePrimaryShards() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("name")); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); @@ -124,11 +123,11 @@ public void testValidateNoActivePrimaryShards() throws Exception { int numShards = 2 + randomInt(2); int numStartedShards = 1; Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStoreField.INDEX_NAME).settings(settings) - .numberOfShards(numShards).numberOfReplicas(1)); + .numberOfShards(numShards).numberOfReplicas(1)); final Index index = metaDataBuilder.get(TriggeredWatchStoreField.INDEX_NAME).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); for (int i = 0; i < numShards; i++) { @@ -143,9 +142,9 @@ public void testValidateNoActivePrimaryShards() throws Exception { } ShardId shardId = new ShardId(index, 0); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) - .addShard(TestShardRouting.newShardRouting(shardId, currentNodeId, null, true, state, - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))) - .build()); + .addShard(TestShardRouting.newShardRouting(shardId, currentNodeId, null, true, state, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))) + .build()); indexRoutingTableBuilder.addReplica(); } routingTableBuilder.add(indexRoutingTableBuilder.build()); @@ -154,10 +153,10 @@ public void testValidateNoActivePrimaryShards() throws Exception { csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); - assertThat(triggeredWatchStore.validate(cs), is(false)); + assertThat(TriggeredWatchStore.validate(cs), is(false)); } - public void testFindTriggeredWatchesGoodCase() throws Exception { + public void testFindTriggeredWatchesGoodCase() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); @@ -167,8 +166,8 @@ public void testFindTriggeredWatchesGoodCase() throws Exception { IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); ShardId shardId = new ShardId(index, 0); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) - .addShard(TestShardRouting.newShardRouting(shardId, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); + .addShard(TestShardRouting.newShardRouting(shardId, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); indexRoutingTableBuilder.addReplica(); routingTableBuilder.add(indexRoutingTableBuilder.build()); csBuilder.metaData(metaDataBuilder); @@ -206,7 +205,7 @@ public void testFindTriggeredWatchesGoodCase() throws Exception { hit.sourceRef(source); hits = new SearchHits(new SearchHit[]{hit}, 1, 1.0f); SearchResponse searchResponse2 = new SearchResponse( - new InternalSearchResponse(hits, null, null, null, false, null, 1), "_scrollId1", 1, 1, 0, 1, null, null); + new InternalSearchResponse(hits, null, null, null, false, null, 1), "_scrollId1", 1, 1, 0, 1, null, null); SearchResponse searchResponse3 = new SearchResponse(InternalSearchResponse.empty(), "_scrollId2", 1, 1, 0, 1, null, null); doAnswer(invocation -> { @@ -229,7 +228,7 @@ public void testFindTriggeredWatchesGoodCase() throws Exception { when(client.clearScroll(any())).thenReturn(clearScrollResponseFuture); clearScrollResponseFuture.onResponse(new ClearScrollResponse(true, 1)); - assertThat(triggeredWatchStore.validate(cs), is(true)); + assertThat(TriggeredWatchStore.validate(cs), is(true)); DateTime now = DateTime.now(UTC); ScheduleTriggerEvent triggerEvent = new ScheduleTriggerEvent(now, now); @@ -260,85 +259,86 @@ public void testFindTriggeredWatchesGoodCase() throws Exception { // the elasticsearch migration helper is doing reindex using aliases, so we have to // make sure that the watch store supports a single alias pointing to the watch index - public void testLoadStoreAsAlias() throws Exception { + public void testLoadStoreAsAlias() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); MetaData.Builder metaDataBuilder = MetaData.builder(); metaDataBuilder.put(IndexMetaData.builder("triggered-watches-alias").settings(indexSettings) - .putAlias(new AliasMetaData.Builder(TriggeredWatchStoreField.INDEX_NAME).build())); + .putAlias(new AliasMetaData.Builder(TriggeredWatchStoreField.INDEX_NAME).build())); final Index index = metaDataBuilder.get("triggered-watches-alias").getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); ShardId shardId = new ShardId(index, 0); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) - .addShard(TestShardRouting.newShardRouting(shardId, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); + .addShard(TestShardRouting.newShardRouting(shardId, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); indexRoutingTableBuilder.addReplica(); routingTableBuilder.add(indexRoutingTableBuilder.build()); csBuilder.metaData(metaDataBuilder); csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); - assertThat(triggeredWatchStore.validate(cs), is(true)); + assertThat(TriggeredWatchStore.validate(cs), is(true)); } // the elasticsearch migration helper is doing reindex using aliases, so we have to // make sure that the watch store supports only a single index in an alias - public void testLoadingFailsWithTwoAliases() throws Exception { + public void testLoadingFailsWithTwoAliases() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); MetaData.Builder metaDataBuilder = MetaData.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); metaDataBuilder.put(IndexMetaData.builder("triggered-watches-alias").settings(indexSettings) - .putAlias(new AliasMetaData.Builder(TriggeredWatchStoreField.INDEX_NAME).build())); + .putAlias(new AliasMetaData.Builder(TriggeredWatchStoreField.INDEX_NAME).build())); metaDataBuilder.put(IndexMetaData.builder("whatever").settings(indexSettings) - .putAlias(new AliasMetaData.Builder(TriggeredWatchStoreField.INDEX_NAME).build())); + .putAlias(new AliasMetaData.Builder(TriggeredWatchStoreField.INDEX_NAME).build())); final Index index = metaDataBuilder.get("triggered-watches-alias").getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting("triggered-watches-alias", 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); + .addShard(TestShardRouting.newShardRouting("triggered-watches-alias", 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); indexRoutingTableBuilder.addReplica(); final Index otherIndex = metaDataBuilder.get("whatever").getIndex(); IndexRoutingTable.Builder otherIndexRoutingTableBuilder = IndexRoutingTable.builder(otherIndex); otherIndexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting("whatever", 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); + .addShard(TestShardRouting.newShardRouting("whatever", 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); csBuilder.metaData(metaDataBuilder); csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); - assertThat(triggeredWatchStore.validate(cs), is(false)); + IllegalStateException e = expectThrows(IllegalStateException.class, () -> TriggeredWatchStore.validate(cs)); + assertThat(e.getMessage(), is("Alias [.triggered_watches] points to more than one index")); } // this is a special condition that could lead to an NPE in earlier versions - public void testTriggeredWatchesIndexIsClosed() throws Exception { + public void testTriggeredWatchesIndexIsClosed() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); MetaData.Builder metaDataBuilder = MetaData.builder(); metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStoreField.INDEX_NAME) - .settings(indexSettings) - .state(IndexMetaData.State.CLOSE)); + .settings(indexSettings) + .state(IndexMetaData.State.CLOSE)); csBuilder.metaData(metaDataBuilder); - assertThat(triggeredWatchStore.validate(csBuilder.build()), is(false)); + assertThat(TriggeredWatchStore.validate(csBuilder.build()), is(false)); } - public void testTriggeredWatchesIndexDoesNotExistOnStartup() throws Exception { + public void testTriggeredWatchesIndexDoesNotExistOnStartup() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); ClusterState cs = csBuilder.build(); - assertThat(triggeredWatchStore.validate(cs), is(true)); + assertThat(TriggeredWatchStore.validate(cs), is(true)); Watch watch = mock(Watch.class); triggeredWatchStore.findTriggeredWatches(Collections.singletonList(watch), cs); verifyZeroInteractions(client); } - public void testIndexNotFoundButInMetaData() throws Exception { + public void testIndexNotFoundButInMetaData() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); MetaData.Builder metaDataBuilder = MetaData.builder() - .put(IndexMetaData.builder(TriggeredWatchStoreField.INDEX_NAME).settings(indexSettings)); + .put(IndexMetaData.builder(TriggeredWatchStoreField.INDEX_NAME).settings(indexSettings)); csBuilder.metaData(metaDataBuilder); ClusterState cs = csBuilder.build(); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java index f2a0f4c311ad6..8f1cce9305571 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java @@ -42,7 +42,6 @@ import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.core.IsEqual.equalTo; import static org.joda.time.DateTimeZone.UTC; @@ -64,7 +63,6 @@ public void init() { when(client.threadPool()).thenReturn(threadPool); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); historyStore = new HistoryStore(Settings.EMPTY, client); - historyStore.start(); } public void testPut() throws Exception { @@ -80,7 +78,7 @@ public void testPut() throws Exception { IndexRequest request = (IndexRequest) invocation.getArguments()[0]; PlainActionFuture indexFuture = PlainActionFuture.newFuture(); if (request.id().equals(wid.value()) && request.type().equals(HistoryStore.DOC_TYPE) && request.opType() == OpType.CREATE - && request.index().equals(index)) { + && request.index().equals(index)) { indexFuture.onResponse(indexResponse); } else { indexFuture.onFailure(new ElasticsearchException("test issue")); @@ -92,32 +90,16 @@ public void testPut() throws Exception { verify(client).index(any()); } - public void testPutStopped() throws Exception { - Wid wid = new Wid("_name", new DateTime(0, UTC)); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), new DateTime(0, UTC), new DateTime(0, UTC)); - WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, null, randomAlphaOfLength(10)); - - historyStore.stop(); - try { - historyStore.put(watchRecord); - fail("Expected IllegalStateException"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), is("unable to persist watch record history store is not ready")); - } finally { - historyStore.start(); - } - } - public void testIndexNameGeneration() { String indexTemplateVersion = INDEX_TEMPLATE_VERSION; assertThat(getHistoryIndexNameForTime(new DateTime(0, UTC)), - equalTo(".watcher-history-"+ indexTemplateVersion +"-1970.01.01")); + equalTo(".watcher-history-"+ indexTemplateVersion +"-1970.01.01")); assertThat(getHistoryIndexNameForTime(new DateTime(100000000000L, UTC)), - equalTo(".watcher-history-" + indexTemplateVersion + "-1973.03.03")); + equalTo(".watcher-history-" + indexTemplateVersion + "-1973.03.03")); assertThat(getHistoryIndexNameForTime(new DateTime(1416582852000L, UTC)), - equalTo(".watcher-history-" + indexTemplateVersion + "-2014.11.21")); + equalTo(".watcher-history-" + indexTemplateVersion + "-2014.11.21")); assertThat(getHistoryIndexNameForTime(new DateTime(2833165811000L, UTC)), - equalTo(".watcher-history-" + indexTemplateVersion + "-2059.10.12")); + equalTo(".watcher-history-" + indexTemplateVersion + "-2059.10.12")); } public void testStoreWithHideSecrets() throws Exception { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java index 1e3a9e43a1019..71cc567d6046c 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java @@ -31,7 +31,6 @@ import org.hamcrest.Matchers; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; -import org.junit.Before; import java.util.Arrays; import java.util.List; @@ -62,11 +61,6 @@ protected boolean timeWarped() { return false; } - @Before - public void deleteAllWatchHistoryIndices() { - assertAcked(client().admin().indices().prepareDelete(HistoryStoreField.INDEX_PREFIX + "*")); - } - public void testLoadMalformedWatchRecord() throws Exception { client().prepareIndex(Watch.INDEX, Watch.DOC_TYPE, "_id") .setSource(jsonBuilder().startObject() @@ -141,8 +135,10 @@ public void testLoadMalformedWatchRecord() throws Exception { stopWatcher(); startWatcher(); - WatcherStatsResponse response = watcherClient().prepareWatcherStats().get(); - assertThat(response.getWatchesCount(), equalTo(1L)); + assertBusy(() -> { + WatcherStatsResponse response = watcherClient().prepareWatcherStats().get(); + assertThat(response.getWatchesCount(), equalTo(1L)); + }); } @AwaitsFix(bugUrl = "https://github.com/elastic/x-pack-elasticsearch/issues/1915") diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java index 8cb4ac1d07bf4..d98d6a44daf37 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java @@ -88,9 +88,7 @@ public void testAckSingleAction() throws Exception { assertThat(a1CountAfterAck, greaterThan(0L)); assertThat(a2CountAfterAck, greaterThan(0L)); - logger.info("###3"); timeWarp().trigger("_id", 4, TimeValue.timeValueSeconds(5)); - logger.info("###4"); flush(); refresh(); @@ -107,9 +105,7 @@ public void testAckSingleAction() throws Exception { assertEquals(DocWriteResponse.Result.DELETED, response.getResult()); refresh(); - logger.info("###5"); timeWarp().trigger("_id", 4, TimeValue.timeValueSeconds(5)); - logger.info("###6"); GetWatchResponse getWatchResponse = watcherClient().prepareGetWatch("_id").get(); assertThat(getWatchResponse.isFound(), is(true)); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsActionTests.java index 94b356286daff..4eb1d709c3bad 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsActionTests.java @@ -26,7 +26,7 @@ import org.elasticsearch.xpack.core.watcher.common.stats.Counters; import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse; -import org.elasticsearch.xpack.watcher.WatcherService; +import org.elasticsearch.xpack.watcher.WatcherLifeCycleService; import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.junit.Before; @@ -59,8 +59,8 @@ public void setupTransportAction() { when(clusterState.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA); when(clusterService.state()).thenReturn(clusterState); - WatcherService watcherService = mock(WatcherService.class); - when(watcherService.state()).thenReturn(WatcherState.STARTED); + WatcherLifeCycleService watcherLifeCycleService = mock(WatcherLifeCycleService.class); + when(watcherLifeCycleService.getState()).thenReturn(WatcherState.STARTED); ExecutionService executionService = mock(ExecutionService.class); when(executionService.executionThreadPoolQueueSize()).thenReturn(100L); @@ -80,9 +80,9 @@ public void setupTransportAction() { secondTriggerServiceStats.inc("foo.bar.baz", 1024); when(triggerService.stats()).thenReturn(firstTriggerServiceStats, secondTriggerServiceStats); - action = new TransportWatcherStatsAction(Settings.EMPTY, transportService, - clusterService, threadPool, new ActionFilters(Collections.emptySet()), - new IndexNameExpressionResolver(Settings.EMPTY), watcherService, executionService, triggerService); + action = new TransportWatcherStatsAction(Settings.EMPTY, transportService, clusterService, threadPool, new + ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver(Settings.EMPTY), watcherLifeCycleService, + executionService, triggerService); } public void testWatcherStats() throws Exception { @@ -92,7 +92,7 @@ public void testWatcherStats() throws Exception { WatcherStatsResponse.Node nodeResponse2 = action.nodeOperation(new WatcherStatsRequest.Node(request, "nodeId2")); WatcherStatsResponse response = action.newResponse(request, - Arrays.asList(nodeResponse1, nodeResponse2), Collections.emptyList()); + Arrays.asList(nodeResponse1, nodeResponse2), Collections.emptyList()); assertThat(response.getWatchesCount(), is(40L)); try (XContentBuilder builder = jsonBuilder()) { @@ -107,4 +107,4 @@ public void testWatcherStats() throws Exception { assertThat(objectPath.evaluate("stats.1.stats.whatever"), is(1)); } } -} \ No newline at end of file +} diff --git a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java index 6581de8fa26fb..35a70a0aaeb64 100644 --- a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java +++ b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java @@ -20,10 +20,10 @@ import org.junit.Before; import java.io.IOException; -import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import static java.util.Collections.emptyMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -38,28 +38,64 @@ public class SmokeTestWatcherTestSuiteIT extends ESRestTestCase { @Before public void startWatcher() throws Exception { + // delete the watcher history to not clutter with entries from other test + assertOK(adminClient().performRequest("DELETE", ".watcher-history-*")); + assertBusy(() -> { - adminClient().performRequest("POST", "_xpack/watcher/_start"); + Response response = adminClient().performRequest("GET", "_xpack/watcher/stats"); + String state = ObjectPath.createFromResponse(response).evaluate("stats.0.watcher_state"); + + switch (state) { + case "stopped": + Response startResponse = adminClient().performRequest("POST", "/_xpack/watcher/_start"); + boolean isAcknowledged = ObjectPath.createFromResponse(startResponse).evaluate("acknowledged"); + assertThat(isAcknowledged, is(true)); + break; + case "stopping": + throw new AssertionError("waiting until stopping state reached stopped state to start again"); + case "starting": + throw new AssertionError("waiting until starting state reached started state"); + case "started": + // all good here, we are done + break; + default: + throw new AssertionError("unknown state[" + state + "]"); + } + }); + assertBusy(() -> { for (String template : WatcherIndexTemplateRegistryField.TEMPLATE_NAMES) { - assertOK(adminClient().performRequest("HEAD", "_template/" + template)); + Response templateExistsResponse = adminClient().performRequest("HEAD", "_template/" + template, emptyMap()); + assertThat(templateExistsResponse.getStatusLine().getStatusCode(), is(200)); } - - Response statsResponse = adminClient().performRequest("GET", "_xpack/watcher/stats"); - ObjectPath objectPath = ObjectPath.createFromResponse(statsResponse); - String state = objectPath.evaluate("stats.0.watcher_state"); - assertThat(state, is("started")); }); + + // TODO why does the test fail without this? relaoding isseu with the new approach? Make sure to write a unit test! + assertOK(adminClient().performRequest("PUT", ".watches")); } @After public void stopWatcher() throws Exception { assertBusy(() -> { - adminClient().performRequest("POST", "_xpack/watcher/_stop", Collections.emptyMap()); - Response statsResponse = adminClient().performRequest("GET", "_xpack/watcher/stats"); - ObjectPath objectPath = ObjectPath.createFromResponse(statsResponse); - String state = objectPath.evaluate("stats.0.watcher_state"); - assertThat(state, is("stopped")); + Response response = adminClient().performRequest("GET", "_xpack/watcher/stats", emptyMap()); + String state = ObjectPath.createFromResponse(response).evaluate("stats.0.watcher_state"); + + switch (state) { + case "stopped": + // all good here, we are done + break; + case "stopping": + throw new AssertionError("waiting until stopping state reached stopped state"); + case "starting": + throw new AssertionError("waiting until starting state reached started state to stop"); + case "started": + Response stopResponse = adminClient().performRequest("POST", "/_xpack/watcher/_stop", emptyMap()); + boolean isAcknowledged = ObjectPath.createFromResponse(stopResponse).evaluate("acknowledged"); + assertThat(isAcknowledged, is(true)); + break; + default: + throw new AssertionError("unknown state[" + state + "]"); + } }); } @@ -99,18 +135,18 @@ public void testMonitorClusterHealth() throws Exception { builder.startObject("trigger").startObject("schedule").field("interval", "1s").endObject().endObject(); // input builder.startObject("input").startObject("http").startObject("request").field("host", host).field("port", port) - .field("path", "/_cluster/health") - .field("scheme", "http") - .startObject("auth").startObject("basic") - .field("username", TEST_ADMIN_USERNAME).field("password", TEST_ADMIN_PASSWORD) - .endObject().endObject() - .endObject().endObject().endObject(); + .field("path", "/_cluster/health") + .field("scheme", "http") + .startObject("auth").startObject("basic") + .field("username", TEST_ADMIN_USERNAME).field("password", TEST_ADMIN_PASSWORD) + .endObject().endObject() + .endObject().endObject().endObject(); // condition builder.startObject("condition").startObject("compare").startObject("ctx.payload.number_of_data_nodes").field("lt", 10) - .endObject().endObject().endObject(); + .endObject().endObject().endObject(); // actions builder.startObject("actions").startObject("log").startObject("logging").field("text", "executed").endObject().endObject() - .endObject(); + .endObject(); builder.endObject(); @@ -132,7 +168,7 @@ public void testMonitorClusterHealth() throws Exception { private void indexWatch(String watchId, XContentBuilder builder) throws Exception { StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); - Response response = client().performRequest("PUT", "_xpack/watcher/watch/" + watchId, Collections.emptyMap(), entity); + Response response = client().performRequest("PUT", "_xpack/watcher/watch/" + watchId, emptyMap(), entity); assertOK(response); Map responseMap = entityAsMap(response); assertThat(responseMap, hasEntry("_id", watchId)); @@ -155,14 +191,14 @@ private ObjectPath getWatchHistoryEntry(String watchId) throws Exception { builder.startObject(); builder.startObject("query").startObject("bool").startArray("must"); builder.startObject().startObject("term").startObject("watch_id").field("value", watchId).endObject().endObject() - .endObject(); + .endObject(); builder.endArray().endObject().endObject(); builder.startArray("sort").startObject().startObject("trigger_event.triggered_time").field("order", "desc").endObject() - .endObject().endArray(); + .endObject().endArray(); builder.endObject(); StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); - Response response = client().performRequest("POST", ".watcher-history-*/_search", Collections.emptyMap(), entity); + Response response = client().performRequest("POST", ".watcher-history-*/_search", emptyMap(), entity); ObjectPath objectPath = ObjectPath.createFromResponse(response); int totalHits = objectPath.evaluate("hits.total"); assertThat(totalHits, is(greaterThanOrEqualTo(1))); From 14f0d9a46bbbef83e350ec8ad60ce09a9fbb3d0d Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 25 Apr 2018 23:40:12 +0200 Subject: [PATCH 2/4] review comment: remove synchronized --- .../elasticsearch/xpack/watcher/execution/ExecutionService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index ae6854833b827..6901adb0a6937 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -116,7 +116,7 @@ public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredW this.currentExecutions.set(new CurrentExecutions()); } - public synchronized void unPause() { + public void unPause() { paused.set(false); } From 180c4377fc5fb8b36a93264d2fd40bd5a66d53b6 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Thu, 26 Apr 2018 16:46:28 +0200 Subject: [PATCH 3/4] Tests: delete old watches before each REST test --- .../xpack/test/rest/XPackRestIT.java | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java index b5f2003bbd2de..99a6e29e334f6 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java @@ -8,6 +8,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.apache.http.HttpStatus; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.client.Response; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -17,6 +18,7 @@ import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; import org.elasticsearch.test.rest.yaml.ClientYamlTestResponse; import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -83,7 +85,6 @@ private void waitForTemplates() throws Exception { templates.addAll(Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME, AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix())); - templates.addAll(Arrays.asList(WatcherIndexTemplateRegistryField.TEMPLATE_NAMES)); for (String template : templates) { awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(), @@ -119,6 +120,27 @@ private void waitForWatcher() throws Exception { throw new AssertionError("unknown state[" + state + "]"); } }); + + for (String template : WatcherIndexTemplateRegistryField.TEMPLATE_NAMES) { + awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(), + response -> true, + () -> "Exception when waiting for [" + template + "] template to be created"); + } + + boolean existsWatcherIndex = adminClient().performRequest("HEAD", ".watches").getStatusLine().getStatusCode() == 200; + if (existsWatcherIndex == false) { + return; + } + Response response = adminClient().performRequest("GET", ".watches/_search", Collections.singletonMap("size", "1000")); + ObjectPath objectPathResponse = ObjectPath.createFromResponse(response); + int totalHits = objectPathResponse.evaluate("hits.total"); + if (totalHits > 0) { + List> hits = objectPathResponse.evaluate("hits.hits"); + for (Map hit : hits) { + String id = (String) hit.get("_id"); + assertOK(adminClient().performRequest("DELETE", "_xpack/watcher/watch/" + id)); + } + } } } From ec4ea395b78d73bc0081fa5c7b9d6f3f20ed743b Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 2 May 2018 18:15:36 +0200 Subject: [PATCH 4/4] Added changelog entry --- docs/CHANGELOG.asciidoc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 7fc5c48d73acd..9e25293d4465c 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -90,6 +90,11 @@ option. ({pull}30140[#29658]) Added new "Request" object flavored request methods. Prefer these instead of the multi-argument versions. ({pull}29623[#29623]) +The cluster state listener to decide if watcher should be +stopped/started/paused now runs far less code in an executor but is more +synchronous and predictable. Also the trigger engine thread is only started on +data nodes. And the Execute Watch API can be triggered regardless is watcher is +started or stopped. ({pull}30118[#30118]) [float] === Bug Fixes