diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index 664c0d07b74df..e1cfb7716862a 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -81,6 +81,7 @@ import org.elasticsearch.xpack.indexlifecycle.action.TransportStartILMAction; import org.elasticsearch.xpack.indexlifecycle.action.TransportStopILMAction; import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleService; +import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleTask; import org.elasticsearch.xpack.snapshotlifecycle.action.DeleteSnapshotLifecycleAction; import org.elasticsearch.xpack.snapshotlifecycle.action.GetSnapshotLifecycleAction; import org.elasticsearch.xpack.snapshotlifecycle.action.PutSnapshotLifecycleAction; @@ -151,7 +152,8 @@ public Collection createComponents(Client client, ClusterService cluster } indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool, getClock(), System::currentTimeMillis, xContentRegistry)); - snapshotLifecycleService.set(new SnapshotLifecycleService(settings, client, clusterService, getClock())); + snapshotLifecycleService.set(new SnapshotLifecycleService(settings, + () -> new SnapshotLifecycleTask(client), clusterService, getClock())); return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get()); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleMetadata.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleMetadata.java index dbeb2686affcb..3de5a3002bdef 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleMetadata.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleMetadata.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.Map; import java.util.TreeMap; @@ -35,7 +36,7 @@ public class SnapshotLifecycleMetadata implements XPackMetaDataCustom { private final Map snapshotConfigurations; public SnapshotLifecycleMetadata(Map snapshotConfigurations) { - this.snapshotConfigurations = Collections.unmodifiableMap(snapshotConfigurations); + this.snapshotConfigurations = new HashMap<>(snapshotConfigurations); // TODO: maybe operation mode here so it can be disabled/re-enabled separately like ILM is } @@ -44,7 +45,7 @@ public SnapshotLifecycleMetadata(StreamInput in) throws IOException { } public Map getSnapshotConfigurations() { - return this.snapshotConfigurations; + return Collections.unmodifiableMap(this.snapshotConfigurations); } @Override diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleService.java index c281d8e63874e..83c1c05a8f648 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleService.java @@ -8,7 +8,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -23,6 +22,10 @@ import java.io.Closeable; import java.time.Clock; import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * {@code SnapshotLifecycleService} manages snapshot policy scheduling and triggering of the @@ -32,6 +35,7 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closeable, ClusterStateListener { private static final Logger logger = LogManager.getLogger(SnapshotLifecycleMetadata.class); + private static final String JOB_PATTERN_SUFFIX = "-\\d+$"; private final SchedulerEngine scheduler; private final ClusterService clusterService; @@ -39,21 +43,23 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea private final Map scheduledTasks = ConcurrentCollections.newConcurrentMap(); private volatile boolean isMaster = false; - public SnapshotLifecycleService(Settings settings, Client client, ClusterService clusterService, + public SnapshotLifecycleService(Settings settings, + Supplier taskSupplier, + ClusterService clusterService, Clock clock) { this.scheduler = new SchedulerEngine(settings, clock); this.clusterService = clusterService; - this.snapshotTask = new SnapshotLifecycleTask(client); + this.snapshotTask = taskSupplier.get(); clusterService.addLocalNodeMasterListener(this); // TODO: change this not to use 'this' clusterService.addListener(this); } @Override - public void clusterChanged(ClusterChangedEvent event) { + public void clusterChanged(final ClusterChangedEvent event) { if (this.isMaster) { - // TODO: handle modified policies (currently they are ignored) - // TODO: handle deleted policies - scheduleSnapshotJobs(event.state()); + final ClusterState state = event.state(); + scheduleSnapshotJobs(state); + cleanupDeletedPolicies(state); } } @@ -71,6 +77,11 @@ public void offMaster() { cancelSnapshotJobs(); } + // Only used for testing + SchedulerEngine getScheduler() { + return this.scheduler; + } + /** * Schedule all non-scheduled snapshot jobs contained in the cluster state */ @@ -81,35 +92,85 @@ public void scheduleSnapshotJobs(final ClusterState state) { } } + public void cleanupDeletedPolicies(final ClusterState state) { + SnapshotLifecycleMetadata snapMeta = state.metaData().custom(SnapshotLifecycleMetadata.TYPE); + if (snapMeta != null) { + // Retrieve all of the expected policy job ids from the policies in the metadata + final Set policyJobIds = snapMeta.getSnapshotConfigurations().values().stream() + .map(SnapshotLifecycleService::getJobId) + .collect(Collectors.toSet()); + + // Cancel all jobs that are *NOT* in the scheduled tasks map + scheduledTasks.keySet().stream() + .filter(jobId -> policyJobIds.contains(jobId) == false) + .forEach(this::cancelScheduledSnapshot); + } + } + /** - * Schedule the {@link SnapshotLifecyclePolicy} job if it does not already exist. If the job already - * exists it is not interfered with. + * Schedule the {@link SnapshotLifecyclePolicy} job if it does not already exist. First checks + * to see if any previous versions of the policy were scheduled, and if so, cancels those. If + * the same version of a policy has already been scheduled it does not overwrite the job. */ public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshotLifecyclePolicy) { - final String jobId = snapshotLifecyclePolicy.getPolicy().getId(); + final String jobId = getJobId(snapshotLifecyclePolicy); + final Pattern existingJobPattern = Pattern.compile(snapshotLifecyclePolicy.getPolicy().getId() + JOB_PATTERN_SUFFIX); + + // Find and cancel any existing jobs for this policy + final boolean existingJobsFoundAndCancelled = scheduledTasks.keySet().stream() + // Find all jobs matching the `jobid-\d+` pattern + .filter(jId -> existingJobPattern.matcher(jId).matches()) + // Filter out a job that has not been changed (matches the id exactly meaning the version is the same) + .filter(jId -> jId.equals(jobId) == false) + .map(existingJobId -> { + // Cancel existing job so the new one can be scheduled + logger.debug("removing existing snapshot lifecycle job [{}] as it has been updated", existingJobId); + scheduledTasks.remove(existingJobId); + boolean existed = scheduler.remove(existingJobId); + assert existed : "expected job for " + existingJobId + " to exist in scheduler"; + return existed; + }) + .reduce(false, (a, b) -> a || b); + + // Now atomically schedule the new job and add it to the scheduled tasks map. If the jobId + // is identical to an existing job (meaning the version has not changed) then this does + // not reschedule it. scheduledTasks.computeIfAbsent(jobId, id -> { final SchedulerEngine.Job job = new SchedulerEngine.Job(jobId, new CronSchedule(snapshotLifecyclePolicy.getPolicy().getSchedule())); - logger.info("scheduling snapshot lifecycle job [{}]", jobId); + if (existingJobsFoundAndCancelled) { + logger.info("rescheduling updated snapshot lifecycle job [{}]", jobId); + } else { + logger.info("scheduling snapshot lifecycle job [{}]", jobId); + } scheduler.add(job); return job; }); } + /** + * Generate the job id for a given policy metadata. The job id is {@code -} + */ + static String getJobId(SnapshotLifecyclePolicyMetadata policyMeta) { + return policyMeta.getPolicy().getId() + "-" + policyMeta.getVersion(); + } + /** * Cancel all scheduled snapshot jobs */ public void cancelSnapshotJobs() { + logger.trace("cancelling all snapshot lifecycle jobs"); scheduler.scheduledJobIds().forEach(scheduler::remove); scheduledTasks.clear(); } /** - * Cancel the given snapshot lifecycle id + * Cancel the given policy job id (from {@link #getJobId(SnapshotLifecyclePolicyMetadata)} */ - public void cancelScheduledSnapshot(final String snapshotLifecycleId) { - scheduledTasks.remove(snapshotLifecycleId); - scheduler.remove(snapshotLifecycleId); + public void cancelScheduledSnapshot(final String lifecycleJobId) { + logger.debug("cancelling snapshot lifecycle job [{}] as it no longer exists", lifecycleJobId); + scheduledTasks.remove(lifecycleJobId); + scheduler.remove(lifecycleJobId); } @Override diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java index 3c1b1df34fddc..c8b1db38035e8 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java @@ -17,7 +17,7 @@ public class SnapshotLifecycleTask implements SchedulerEngine.Listener { private final Client client; - SnapshotLifecycleTask(final Client client) { + public SnapshotLifecycleTask(final Client client) { this.client = client; } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java new file mode 100644 index 0000000000000..599fa747d2eec --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java @@ -0,0 +1,233 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.core.watcher.watch.ClockMock; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +public class SnapshotLifecycleServiceTests extends ESTestCase { + + public void testGetJobId() { + String id = randomAlphaOfLengthBetween(1, 10) + (randomBoolean() ? "" : randomLong()); + SnapshotLifecyclePolicy policy = createPolicy(id); + long version = randomNonNegativeLong(); + SnapshotLifecyclePolicyMetadata meta = new SnapshotLifecyclePolicyMetadata(policy, Collections.emptyMap(), version, 1); + assertThat(SnapshotLifecycleService.getJobId(meta), equalTo(id + "-" + version)); + } + + /** + * Test new policies getting scheduled correctly, updated policies also being scheduled, + * and deleted policies having their schedules cancelled. + */ + public void testPolicyCRUD() throws Exception { + ClockMock clock = new ClockMock(); + final AtomicInteger triggerCount = new AtomicInteger(0); + final AtomicReference> trigger = new AtomicReference<>(e -> triggerCount.incrementAndGet()); + try (ThreadPool threadPool = new TestThreadPool("test"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY, + () -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) { + + sls.offMaster(); + SnapshotLifecycleMetadata snapMeta = new SnapshotLifecycleMetadata(Collections.emptyMap()); + ClusterState previousState = createState(snapMeta); + Map policies = new HashMap<>(); + + SnapshotLifecyclePolicyMetadata policy = + new SnapshotLifecyclePolicyMetadata(createPolicy("foo", "*/1 * * * * ?"), // trigger every second + Collections.emptyMap(), 1, 1); + policies.put(policy.getPolicy().getId(), policy); + snapMeta = new SnapshotLifecycleMetadata(policies); + ClusterState state = createState(snapMeta); + ClusterChangedEvent event = new ClusterChangedEvent("1", state, previousState); + trigger.set(e -> { + fail("trigger should not be invoked"); + }); + sls.clusterChanged(event); + + // Since the service does not think it is master, it should not be triggered or scheduled + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); + + // Change the service to think it's on the master node, events should be scheduled now + sls.onMaster(); + trigger.set(e -> triggerCount.incrementAndGet()); + sls.clusterChanged(event); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-1"))); + + assertBusy(() -> assertThat(triggerCount.get(), greaterThan(0))); + + clock.freeze(); + int currentCount = triggerCount.get(); + previousState = state; + SnapshotLifecyclePolicyMetadata newPolicy = + new SnapshotLifecyclePolicyMetadata(createPolicy("foo", "*/1 * * * * ?"), Collections.emptyMap(), 2, 2); + policies.put(policy.getPolicy().getId(), newPolicy); + state = createState(new SnapshotLifecycleMetadata(policies)); + event = new ClusterChangedEvent("2", state, previousState); + sls.clusterChanged(event); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-2"))); + + trigger.set(e -> { + // Make sure the job got updated + assertThat(e.getJobName(), equalTo("foo-2")); + triggerCount.incrementAndGet(); + }); + clock.fastForwardSeconds(1); + + assertBusy(() -> assertThat(triggerCount.get(), greaterThan(currentCount))); + + final int currentCount2 = triggerCount.get(); + previousState = state; + // Create a state simulating the policy being deleted + state = createState(new SnapshotLifecycleMetadata(Collections.emptyMap())); + event = new ClusterChangedEvent("2", state, previousState); + sls.clusterChanged(event); + clock.fastForwardSeconds(2); + + // The existing job should be cancelled and no longer trigger + assertThat(triggerCount.get(), equalTo(currentCount2)); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); + + // When the service is no longer master, all jobs should be automatically cancelled + policy = + new SnapshotLifecyclePolicyMetadata(createPolicy("foo", "*/1 * * * * ?"), // trigger every second + Collections.emptyMap(), 3, 1); + policies.put(policy.getPolicy().getId(), policy); + snapMeta = new SnapshotLifecycleMetadata(policies); + previousState = state; + state = createState(snapMeta); + event = new ClusterChangedEvent("1", state, previousState); + trigger.set(e -> triggerCount.incrementAndGet()); + sls.clusterChanged(event); + clock.fastForwardSeconds(2); + + // Make sure at least one triggers and the job is scheduled + assertBusy(() -> assertThat(triggerCount.get(), greaterThan(currentCount2))); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-3"))); + + // Signify becoming non-master, the jobs should all be cancelled + sls.offMaster(); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); + + threadPool.shutdownNow(); + } + } + + /** + * Test for policy ids ending in numbers the way generate job ids doesn't cause confusion + */ + public void testPolicyNamesEndingInNumbers() throws Exception { + ClockMock clock = new ClockMock(); + final AtomicInteger triggerCount = new AtomicInteger(0); + final AtomicReference> trigger = new AtomicReference<>(e -> triggerCount.incrementAndGet()); + try (ThreadPool threadPool = new TestThreadPool("test"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY, + () -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) { + sls.onMaster(); + + SnapshotLifecycleMetadata snapMeta = new SnapshotLifecycleMetadata(Collections.emptyMap()); + ClusterState previousState = createState(snapMeta); + Map policies = new HashMap<>(); + + SnapshotLifecyclePolicyMetadata policy = + new SnapshotLifecyclePolicyMetadata(createPolicy("foo-2", "30 * * * * ?"), + Collections.emptyMap(), 1, 1); + policies.put(policy.getPolicy().getId(), policy); + snapMeta = new SnapshotLifecycleMetadata(policies); + ClusterState state = createState(snapMeta); + ClusterChangedEvent event = new ClusterChangedEvent("1", state, previousState); + sls.clusterChanged(event); + + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-2-1"))); + + previousState = state; + SnapshotLifecyclePolicyMetadata secondPolicy = + new SnapshotLifecyclePolicyMetadata(createPolicy("foo-1", "45 * * * * ?"), + Collections.emptyMap(), 2, 1); + policies.put(secondPolicy.getPolicy().getId(), secondPolicy); + snapMeta = new SnapshotLifecycleMetadata(policies); + state = createState(snapMeta); + event = new ClusterChangedEvent("2", state, previousState); + sls.clusterChanged(event); + + assertThat(sls.getScheduler().scheduledJobIds(), containsInAnyOrder("foo-2-1", "foo-1-2")); + + sls.offMaster(); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); + + threadPool.shutdownNow(); + } + } + + class FakeSnapshotTask extends SnapshotLifecycleTask { + private final Consumer onTriggered; + + FakeSnapshotTask(Consumer onTriggered) { + super(null); + this.onTriggered = onTriggered; + } + + @Override + public void triggered(SchedulerEngine.Event event) { + logger.info("--> fake snapshot task triggered"); + onTriggered.accept(event); + } + } + + public ClusterState createState(SnapshotLifecycleMetadata snapMeta) { + MetaData metaData = MetaData.builder() + .putCustom(SnapshotLifecycleMetadata.TYPE, snapMeta) + .build(); + return ClusterState.builder(new ClusterName("cluster")) + .metaData(metaData) + .build(); + } + + public SnapshotLifecyclePolicy createPolicy(String id) { + return createPolicy(id, randomSchedule()); + } + + public SnapshotLifecyclePolicy createPolicy(String id, String schedule) { + Map config = new HashMap<>(); + config.put("ignore_unavailable", randomBoolean()); + List indices = new ArrayList<>(); + indices.add("foo-*"); + indices.add(randomAlphaOfLength(4)); + config.put("indices", indices); + return new SnapshotLifecyclePolicy(id, randomAlphaOfLength(4), schedule, randomAlphaOfLength(4), config); + } + + private String randomSchedule() { + return randomIntBetween(0, 59) + " " + + randomIntBetween(0, 59) + " " + + randomIntBetween(0, 12) + " * * ?"; + } +}