Skip to content

Commit

Permalink
Watcher: Ensure trigger service pauses execution (#30363)
Browse files Browse the repository at this point in the history
When the watcher service pauses execution due to a cluster state update,
the trigger service and its engines also need to pause properly instead
of keeping going. This is also important when the .watches index is 
deleted, so that watches don't stay in a triggered mode.
  • Loading branch information
spinscale authored May 4, 2018
1 parent 137ce70 commit 0e6cbbd
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ void shutDown() {
void stopExecutor() {
ThreadPool.terminate(executor, 10L, TimeUnit.SECONDS);
}

/**
* Reload the watcher service, does not switch the state from stopped to started, just keep going
* @param state cluster state, which is needed to find out about local shards
Expand Down Expand Up @@ -231,6 +232,7 @@ private synchronized void reloadInner(ClusterState state, String reason, boolean
* manual watch execution, i.e. via the execute watch API
*/
public void pauseExecution(String reason) {
triggerService.pauseExecution();
int cancelledTaskCount = executionService.pause();
logger.info("paused watch execution, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.watcher.trigger.Trigger;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.watch.WatchParser;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -204,6 +208,36 @@ void stopExecutor() {
assertThat(watches, hasSize(activeWatchCount));
}

public void testPausingWatcherServiceAlsoPausesTriggerService() {
String engineType = "foo";
TriggerEngine triggerEngine = mock(TriggerEngine.class);
when(triggerEngine.type()).thenReturn(engineType);
TriggerService triggerService = new TriggerService(Settings.EMPTY, Collections.singleton(triggerEngine));

Trigger trigger = mock(Trigger.class);
when(trigger.type()).thenReturn(engineType);

Watch watch = mock(Watch.class);
when(watch.trigger()).thenReturn(trigger);
when(watch.condition()).thenReturn(InternalAlwaysCondition.INSTANCE);
ExecutableNoneInput noneInput = new ExecutableNoneInput(logger);
when(watch.input()).thenReturn(noneInput);

triggerService.add(watch);
assertThat(triggerService.count(), is(1L));

WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class),
mock(ExecutionService.class), mock(WatchParser.class), mock(Client.class), executorService) {
@Override
void stopExecutor() {
}
};

service.pauseExecution("pausing");
assertThat(triggerService.count(), is(0L));
verify(triggerEngine).pauseExecution();
}

private static DiscoveryNode newNode() {
return new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT);
Expand Down

0 comments on commit 0e6cbbd

Please sign in to comment.