From a7a53669b1dee3681d1db48d3a78840630e9a59c Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 13 Mar 2019 14:49:41 -0600 Subject: [PATCH 1/2] Handle snapshot lifecycle policy updates and deletions This adds logic to `SnapshotLifecycleService` to handle updates and deletes for snapshot policies. Policies with incremented versions have the old policy cancelled and the new one scheduled. Deleted policies have their schedules cancelled when they are no longer present in the cluster state metadata. Relates to #38461 --- .../xpack/indexlifecycle/IndexLifecycle.java | 4 +- .../SnapshotLifecycleMetadata.java | 5 +- .../SnapshotLifecycleService.java | 91 +++++-- .../SnapshotLifecycleTask.java | 2 +- .../SnapshotLifecycleServiceTests.java | 230 ++++++++++++++++++ 5 files changed, 313 insertions(+), 19 deletions(-) create mode 100644 x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index 664c0d07b74df..e1cfb7716862a 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -81,6 +81,7 @@ import org.elasticsearch.xpack.indexlifecycle.action.TransportStartILMAction; import org.elasticsearch.xpack.indexlifecycle.action.TransportStopILMAction; import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleService; +import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleTask; import org.elasticsearch.xpack.snapshotlifecycle.action.DeleteSnapshotLifecycleAction; import org.elasticsearch.xpack.snapshotlifecycle.action.GetSnapshotLifecycleAction; import org.elasticsearch.xpack.snapshotlifecycle.action.PutSnapshotLifecycleAction; @@ -151,7 +152,8 @@ public Collection createComponents(Client client, ClusterService cluster } indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool, getClock(), System::currentTimeMillis, xContentRegistry)); - snapshotLifecycleService.set(new SnapshotLifecycleService(settings, client, clusterService, getClock())); + snapshotLifecycleService.set(new SnapshotLifecycleService(settings, + () -> new SnapshotLifecycleTask(client), clusterService, getClock())); return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get()); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleMetadata.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleMetadata.java index dbeb2686affcb..3de5a3002bdef 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleMetadata.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleMetadata.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.Map; import java.util.TreeMap; @@ -35,7 +36,7 @@ public class SnapshotLifecycleMetadata implements XPackMetaDataCustom { private final Map snapshotConfigurations; public SnapshotLifecycleMetadata(Map snapshotConfigurations) { - this.snapshotConfigurations = Collections.unmodifiableMap(snapshotConfigurations); + this.snapshotConfigurations = new HashMap<>(snapshotConfigurations); // TODO: maybe operation mode here so it can be disabled/re-enabled separately like ILM is } @@ -44,7 +45,7 @@ public SnapshotLifecycleMetadata(StreamInput in) throws IOException { } public Map getSnapshotConfigurations() { - return this.snapshotConfigurations; + return Collections.unmodifiableMap(this.snapshotConfigurations); } @Override diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleService.java index c281d8e63874e..83c1c05a8f648 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleService.java @@ -8,7 +8,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -23,6 +22,10 @@ import java.io.Closeable; import java.time.Clock; import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * {@code SnapshotLifecycleService} manages snapshot policy scheduling and triggering of the @@ -32,6 +35,7 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closeable, ClusterStateListener { private static final Logger logger = LogManager.getLogger(SnapshotLifecycleMetadata.class); + private static final String JOB_PATTERN_SUFFIX = "-\\d+$"; private final SchedulerEngine scheduler; private final ClusterService clusterService; @@ -39,21 +43,23 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea private final Map scheduledTasks = ConcurrentCollections.newConcurrentMap(); private volatile boolean isMaster = false; - public SnapshotLifecycleService(Settings settings, Client client, ClusterService clusterService, + public SnapshotLifecycleService(Settings settings, + Supplier taskSupplier, + ClusterService clusterService, Clock clock) { this.scheduler = new SchedulerEngine(settings, clock); this.clusterService = clusterService; - this.snapshotTask = new SnapshotLifecycleTask(client); + this.snapshotTask = taskSupplier.get(); clusterService.addLocalNodeMasterListener(this); // TODO: change this not to use 'this' clusterService.addListener(this); } @Override - public void clusterChanged(ClusterChangedEvent event) { + public void clusterChanged(final ClusterChangedEvent event) { if (this.isMaster) { - // TODO: handle modified policies (currently they are ignored) - // TODO: handle deleted policies - scheduleSnapshotJobs(event.state()); + final ClusterState state = event.state(); + scheduleSnapshotJobs(state); + cleanupDeletedPolicies(state); } } @@ -71,6 +77,11 @@ public void offMaster() { cancelSnapshotJobs(); } + // Only used for testing + SchedulerEngine getScheduler() { + return this.scheduler; + } + /** * Schedule all non-scheduled snapshot jobs contained in the cluster state */ @@ -81,35 +92,85 @@ public void scheduleSnapshotJobs(final ClusterState state) { } } + public void cleanupDeletedPolicies(final ClusterState state) { + SnapshotLifecycleMetadata snapMeta = state.metaData().custom(SnapshotLifecycleMetadata.TYPE); + if (snapMeta != null) { + // Retrieve all of the expected policy job ids from the policies in the metadata + final Set policyJobIds = snapMeta.getSnapshotConfigurations().values().stream() + .map(SnapshotLifecycleService::getJobId) + .collect(Collectors.toSet()); + + // Cancel all jobs that are *NOT* in the scheduled tasks map + scheduledTasks.keySet().stream() + .filter(jobId -> policyJobIds.contains(jobId) == false) + .forEach(this::cancelScheduledSnapshot); + } + } + /** - * Schedule the {@link SnapshotLifecyclePolicy} job if it does not already exist. If the job already - * exists it is not interfered with. + * Schedule the {@link SnapshotLifecyclePolicy} job if it does not already exist. First checks + * to see if any previous versions of the policy were scheduled, and if so, cancels those. If + * the same version of a policy has already been scheduled it does not overwrite the job. */ public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshotLifecyclePolicy) { - final String jobId = snapshotLifecyclePolicy.getPolicy().getId(); + final String jobId = getJobId(snapshotLifecyclePolicy); + final Pattern existingJobPattern = Pattern.compile(snapshotLifecyclePolicy.getPolicy().getId() + JOB_PATTERN_SUFFIX); + + // Find and cancel any existing jobs for this policy + final boolean existingJobsFoundAndCancelled = scheduledTasks.keySet().stream() + // Find all jobs matching the `jobid-\d+` pattern + .filter(jId -> existingJobPattern.matcher(jId).matches()) + // Filter out a job that has not been changed (matches the id exactly meaning the version is the same) + .filter(jId -> jId.equals(jobId) == false) + .map(existingJobId -> { + // Cancel existing job so the new one can be scheduled + logger.debug("removing existing snapshot lifecycle job [{}] as it has been updated", existingJobId); + scheduledTasks.remove(existingJobId); + boolean existed = scheduler.remove(existingJobId); + assert existed : "expected job for " + existingJobId + " to exist in scheduler"; + return existed; + }) + .reduce(false, (a, b) -> a || b); + + // Now atomically schedule the new job and add it to the scheduled tasks map. If the jobId + // is identical to an existing job (meaning the version has not changed) then this does + // not reschedule it. scheduledTasks.computeIfAbsent(jobId, id -> { final SchedulerEngine.Job job = new SchedulerEngine.Job(jobId, new CronSchedule(snapshotLifecyclePolicy.getPolicy().getSchedule())); - logger.info("scheduling snapshot lifecycle job [{}]", jobId); + if (existingJobsFoundAndCancelled) { + logger.info("rescheduling updated snapshot lifecycle job [{}]", jobId); + } else { + logger.info("scheduling snapshot lifecycle job [{}]", jobId); + } scheduler.add(job); return job; }); } + /** + * Generate the job id for a given policy metadata. The job id is {@code -} + */ + static String getJobId(SnapshotLifecyclePolicyMetadata policyMeta) { + return policyMeta.getPolicy().getId() + "-" + policyMeta.getVersion(); + } + /** * Cancel all scheduled snapshot jobs */ public void cancelSnapshotJobs() { + logger.trace("cancelling all snapshot lifecycle jobs"); scheduler.scheduledJobIds().forEach(scheduler::remove); scheduledTasks.clear(); } /** - * Cancel the given snapshot lifecycle id + * Cancel the given policy job id (from {@link #getJobId(SnapshotLifecyclePolicyMetadata)} */ - public void cancelScheduledSnapshot(final String snapshotLifecycleId) { - scheduledTasks.remove(snapshotLifecycleId); - scheduler.remove(snapshotLifecycleId); + public void cancelScheduledSnapshot(final String lifecycleJobId) { + logger.debug("cancelling snapshot lifecycle job [{}] as it no longer exists", lifecycleJobId); + scheduledTasks.remove(lifecycleJobId); + scheduler.remove(lifecycleJobId); } @Override diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java index 3c1b1df34fddc..c8b1db38035e8 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java @@ -17,7 +17,7 @@ public class SnapshotLifecycleTask implements SchedulerEngine.Listener { private final Client client; - SnapshotLifecycleTask(final Client client) { + public SnapshotLifecycleTask(final Client client) { this.client = client; } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java new file mode 100644 index 0000000000000..de60e519ae1d7 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java @@ -0,0 +1,230 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.core.watcher.watch.ClockMock; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +public class SnapshotLifecycleServiceTests extends ESTestCase { + + public void testGetJobId() { + String id = randomAlphaOfLengthBetween(1, 10) + (randomBoolean() ? "" : randomLong()); + SnapshotLifecyclePolicy policy = createPolicy(id); + long version = randomNonNegativeLong(); + SnapshotLifecyclePolicyMetadata meta = new SnapshotLifecyclePolicyMetadata(policy, Collections.emptyMap(), version, 1); + assertThat(SnapshotLifecycleService.getJobId(meta), equalTo(id + "-" + version)); + } + + /** + * Test new policies getting scheduled correctly, updated policies also being scheduled, + * and deleted policies having their schedules cancelled. + */ + public void testPolicyCRUD() throws Exception { + ClockMock clock = new ClockMock(); + final AtomicInteger triggerCount = new AtomicInteger(0); + final AtomicReference> trigger = new AtomicReference<>(e -> triggerCount.incrementAndGet()); + try (ThreadPool threadPool = new TestThreadPool("test"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY, + () -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) { + + sls.offMaster(); + SnapshotLifecycleMetadata snapMeta = new SnapshotLifecycleMetadata(Collections.emptyMap()); + ClusterState previousState = createState(snapMeta); + Map policies = new HashMap<>(); + + SnapshotLifecyclePolicyMetadata policy = + new SnapshotLifecyclePolicyMetadata(createPolicy("foo", "*/1 * * * * ?"), // trigger every second + Collections.emptyMap(), 1, 1); + policies.put(policy.getPolicy().getId(), policy); + snapMeta = new SnapshotLifecycleMetadata(policies); + ClusterState state = createState(snapMeta); + ClusterChangedEvent event = new ClusterChangedEvent("1", state, previousState); + sls.clusterChanged(event); + + // Since the service does not think it is master, it should not be triggered + assertThat(triggerCount.get(), equalTo(0)); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); + + // Change the service to think it's on the master node, events should be scheduled now + sls.onMaster(); + sls.clusterChanged(event); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-1"))); + + assertBusy(() -> assertThat(triggerCount.get(), greaterThan(0))); + + clock.freeze(); + int currentCount = triggerCount.get(); + previousState = state; + SnapshotLifecyclePolicyMetadata newPolicy = + new SnapshotLifecyclePolicyMetadata(createPolicy("foo", "*/1 * * * * ?"), Collections.emptyMap(), 2, 2); + policies.put(policy.getPolicy().getId(), newPolicy); + state = createState(new SnapshotLifecycleMetadata(policies)); + event = new ClusterChangedEvent("2", state, previousState); + sls.clusterChanged(event); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-2"))); + + trigger.set(e -> { + // Make sure the job got updated + assertThat(e.getJobName(), equalTo("foo-2")); + triggerCount.incrementAndGet(); + }); + clock.fastForwardSeconds(1); + + assertBusy(() -> assertThat(triggerCount.get(), greaterThan(currentCount))); + + final int currentCount2 = triggerCount.get(); + previousState = state; + // Create a state simulating the policy being deleted + state = createState(new SnapshotLifecycleMetadata(Collections.emptyMap())); + event = new ClusterChangedEvent("2", state, previousState); + sls.clusterChanged(event); + clock.fastForwardSeconds(2); + + // The existing job should be cancelled and no longer trigger + assertThat(triggerCount.get(), equalTo(currentCount2)); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); + + // When the service is no longer master, all jobs should be automatically cancelled + policy = + new SnapshotLifecyclePolicyMetadata(createPolicy("foo", "*/1 * * * * ?"), // trigger every second + Collections.emptyMap(), 3, 1); + policies.put(policy.getPolicy().getId(), policy); + snapMeta = new SnapshotLifecycleMetadata(policies); + previousState = state; + state = createState(snapMeta); + event = new ClusterChangedEvent("1", state, previousState); + trigger.set(e -> triggerCount.incrementAndGet()); + sls.clusterChanged(event); + clock.fastForwardSeconds(2); + + // Make sure at least one triggers and the job is scheduled + assertBusy(() -> assertThat(triggerCount.get(), greaterThan(currentCount2))); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-3"))); + + // Signify becoming non-master, the jobs should all be cancelled + sls.offMaster(); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); + + threadPool.shutdownNow(); + } + } + + /** + * Test for policy ids ending in numbers the way generate job ids doesn't cause confusion + */ + public void testPolicyNamesEndingInNumbers() throws Exception { + ClockMock clock = new ClockMock(); + final AtomicInteger triggerCount = new AtomicInteger(0); + final AtomicReference> trigger = new AtomicReference<>(e -> triggerCount.incrementAndGet()); + try (ThreadPool threadPool = new TestThreadPool("test"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY, + () -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) { + sls.onMaster(); + + SnapshotLifecycleMetadata snapMeta = new SnapshotLifecycleMetadata(Collections.emptyMap()); + ClusterState previousState = createState(snapMeta); + Map policies = new HashMap<>(); + + SnapshotLifecyclePolicyMetadata policy = + new SnapshotLifecyclePolicyMetadata(createPolicy("foo-2", "30 * * * * ?"), + Collections.emptyMap(), 1, 1); + policies.put(policy.getPolicy().getId(), policy); + snapMeta = new SnapshotLifecycleMetadata(policies); + ClusterState state = createState(snapMeta); + ClusterChangedEvent event = new ClusterChangedEvent("1", state, previousState); + sls.clusterChanged(event); + + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-2-1"))); + + previousState = state; + SnapshotLifecyclePolicyMetadata secondPolicy = + new SnapshotLifecyclePolicyMetadata(createPolicy("foo-1", "45 * * * * ?"), + Collections.emptyMap(), 2, 1); + policies.put(secondPolicy.getPolicy().getId(), secondPolicy); + snapMeta = new SnapshotLifecycleMetadata(policies); + state = createState(snapMeta); + event = new ClusterChangedEvent("2", state, previousState); + sls.clusterChanged(event); + + assertThat(sls.getScheduler().scheduledJobIds(), containsInAnyOrder("foo-2-1", "foo-1-2")); + + sls.offMaster(); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); + + threadPool.shutdownNow(); + } + } + + class FakeSnapshotTask extends SnapshotLifecycleTask { + private final Consumer onTriggered; + + FakeSnapshotTask(Consumer onTriggered) { + super(null); + this.onTriggered = onTriggered; + } + + @Override + public void triggered(SchedulerEngine.Event event) { + logger.info("--> fake snapshot task triggered"); + onTriggered.accept(event); + } + } + + public ClusterState createState(SnapshotLifecycleMetadata snapMeta) { + MetaData metaData = MetaData.builder() + .putCustom(SnapshotLifecycleMetadata.TYPE, snapMeta) + .build(); + return ClusterState.builder(new ClusterName("cluster")) + .metaData(metaData) + .build(); + } + + public SnapshotLifecyclePolicy createPolicy(String id) { + return createPolicy(id, randomSchedule()); + } + + public SnapshotLifecyclePolicy createPolicy(String id, String schedule) { + Map config = new HashMap<>(); + config.put("ignore_unavailable", randomBoolean()); + List indices = new ArrayList<>(); + indices.add("foo-*"); + indices.add(randomAlphaOfLength(4)); + config.put("indices", indices); + return new SnapshotLifecyclePolicy(id, randomAlphaOfLength(4), schedule, randomAlphaOfLength(4), config); + } + + private String randomSchedule() { + return randomIntBetween(0, 59) + " " + + randomIntBetween(0, 59) + " " + + randomIntBetween(0, 12) + " * * ?"; + } +} From 15e77cbd2ee4b89236699e966fc9e36d573429d0 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 18 Mar 2019 07:11:46 -0600 Subject: [PATCH 2/2] Use a less racy way of checking that the job is not triggered --- .../snapshotlifecycle/SnapshotLifecycleServiceTests.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java index de60e519ae1d7..599fa747d2eec 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java @@ -67,14 +67,17 @@ public void testPolicyCRUD() throws Exception { snapMeta = new SnapshotLifecycleMetadata(policies); ClusterState state = createState(snapMeta); ClusterChangedEvent event = new ClusterChangedEvent("1", state, previousState); + trigger.set(e -> { + fail("trigger should not be invoked"); + }); sls.clusterChanged(event); - // Since the service does not think it is master, it should not be triggered - assertThat(triggerCount.get(), equalTo(0)); + // Since the service does not think it is master, it should not be triggered or scheduled assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); // Change the service to think it's on the master node, events should be scheduled now sls.onMaster(); + trigger.set(e -> triggerCount.incrementAndGet()); sls.clusterChanged(event); assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-1")));