diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java index 8bd51e88704d0..380eaa8a65104 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java @@ -15,7 +15,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -40,7 +39,10 @@ public class SnapshotHistoryItem implements Writeable, ToXContentObject { static final ParseField SNAPSHOT_NAME = new ParseField("snapshot_name"); static final ParseField OPERATION = new ParseField("operation"); static final ParseField SUCCESS = new ParseField("success"); - private static final String CREATE_OPERATION = "CREATE"; + + public static final String CREATE_OPERATION = "CREATE"; + public static final String DELETE_OPERATION = "DELETE"; + protected final long timestamp; protected final String policyId; protected final String repository; @@ -98,25 +100,34 @@ public static SnapshotHistoryItem parse(XContentParser parser, String name) { this.errorDetails = errorDetails; } - public static SnapshotHistoryItem successRecord(long timestamp, SnapshotLifecyclePolicy policy, String snapshotName) { + public static SnapshotHistoryItem creationSuccessRecord(long timestamp, SnapshotLifecyclePolicy policy, String snapshotName) { return new SnapshotHistoryItem(timestamp, policy.getId(), policy.getRepository(), snapshotName, CREATE_OPERATION, true, policy.getConfig(), null); } - public static SnapshotHistoryItem failureRecord(long timeStamp, SnapshotLifecyclePolicy policy, String snapshotName, - Exception exception) throws IOException { - ToXContent.Params stacktraceParams = new ToXContent.MapParams(Collections.singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false")); - String exceptionString; - try (XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder()) { - causeXContentBuilder.startObject(); - ElasticsearchException.generateThrowableXContent(causeXContentBuilder, stacktraceParams, exception); - causeXContentBuilder.endObject(); - exceptionString = BytesReference.bytes(causeXContentBuilder).utf8ToString(); - } + public static SnapshotHistoryItem creationFailureRecord(long timeStamp, SnapshotLifecyclePolicy policy, String snapshotName, + Exception exception) throws IOException { + String exceptionString = exceptionToString(exception); return new SnapshotHistoryItem(timeStamp, policy.getId(), policy.getRepository(), snapshotName, CREATE_OPERATION, false, policy.getConfig(), exceptionString); } + public static SnapshotHistoryItem deletionSuccessRecord(long timestamp, String snapshotName, String policyId, String repository) { + return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, true, null, null); + } + + public static SnapshotHistoryItem deletionPossibleSuccessRecord(long timestamp, String snapshotName, String policyId, String repository, + String details) { + return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, true, null, details); + } + + public static SnapshotHistoryItem deletionFailureRecord(long timestamp, String snapshotName, String policyId, String repository, + Exception exception) throws IOException { + String exceptionString = exceptionToString(exception); + return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, false, + null, exceptionString); + } + public SnapshotHistoryItem(StreamInput in) throws IOException { this.timestamp = in.readVLong(); this.policyId = in.readString(); @@ -220,4 +231,16 @@ public int hashCode() { public String toString() { return Strings.toString(this); } + + private static String exceptionToString(Exception exception) throws IOException { + Params stacktraceParams = new MapParams(Collections.singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false")); + String exceptionString; + try (XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder()) { + causeXContentBuilder.startObject(); + ElasticsearchException.generateThrowableXContent(causeXContentBuilder, stacktraceParams, exception); + causeXContentBuilder.endObject(); + exceptionString = BytesReference.bytes(causeXContentBuilder).utf8ToString(); + } + return exceptionString; + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryStoreTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryStoreTests.java index ae2bb78e3512b..df8ce48213189 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryStoreTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryStoreTests.java @@ -59,7 +59,7 @@ public void testNoActionIfDisabled() { final long timestamp = randomNonNegativeLong(); SnapshotLifecyclePolicy.ResolverContext context = new SnapshotLifecyclePolicy.ResolverContext(timestamp); String snapshotId = policy.generateSnapshotName(context); - SnapshotHistoryItem record = SnapshotHistoryItem.successRecord(timestamp, policy, snapshotId); + SnapshotHistoryItem record = SnapshotHistoryItem.creationSuccessRecord(timestamp, policy, snapshotId); client.setVerifier((a,r,l) -> { fail("the history store is disabled, no action should have been taken"); @@ -76,7 +76,7 @@ public void testPut() throws Exception { SnapshotLifecyclePolicy.ResolverContext context = new SnapshotLifecyclePolicy.ResolverContext(timestamp); String snapshotId = policy.generateSnapshotName(context); { - SnapshotHistoryItem record = SnapshotHistoryItem.successRecord(timestamp, policy, snapshotId); + SnapshotHistoryItem record = SnapshotHistoryItem.creationSuccessRecord(timestamp, policy, snapshotId); AtomicInteger calledTimes = new AtomicInteger(0); client.setVerifier((action, request, listener) -> { @@ -111,7 +111,7 @@ public void testPut() throws Exception { { final String cause = randomAlphaOfLength(9); Exception failureException = new RuntimeException(cause); - SnapshotHistoryItem record = SnapshotHistoryItem.failureRecord(timestamp, policy, snapshotId, failureException); + SnapshotHistoryItem record = SnapshotHistoryItem.creationFailureRecord(timestamp, policy, snapshotId, failureException); AtomicInteger calledTimes = new AtomicInteger(0); client.setVerifier((action, request, listener) -> { diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java index fcabbc70c09ff..37fc062c6f569 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java @@ -37,10 +37,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.Optional; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.CREATE_OPERATION; +import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.DELETE_OPERATION; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -104,7 +106,7 @@ public void testFullPolicySnapshot() throws Exception { Map metadata = (Map) snapResponse.get(0).get("metadata"); assertNotNull(metadata); assertThat(metadata.get("policy"), equalTo(policyName)); - assertHistoryIsPresent(policyName, true, repoId); + assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION); // Check that the last success date was written to the cluster state Request getReq = new Request("GET", "/_slm/policy/" + policyName); @@ -125,7 +127,7 @@ public void testFullPolicySnapshot() throws Exception { String lastSnapshotName = (String) lastSuccessObject.get("snapshot_name"); assertThat(lastSnapshotName, startsWith("snap-")); - assertHistoryIsPresent(policyName, true, repoId); + assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION); Map stats = getSLMStats(); Map policyStats = (Map) stats.get(SnapshotLifecycleStats.POLICY_STATS.getPreferredName()); @@ -174,7 +176,7 @@ public void testPolicyFailure() throws Exception { assertNotNull(snapshotName); assertThat(snapshotName, startsWith("snap-")); } - assertHistoryIsPresent(policyName, false, repoName); + assertHistoryIsPresent(policyName, false, repoName, CREATE_OPERATION); Map stats = getSLMStats(); Map policyStats = (Map) stats.get(SnapshotLifecycleStats.POLICY_STATS.getPreferredName()); @@ -224,7 +226,7 @@ public void testPolicyManualExecution() throws Exception { final Map metadata = extractMetadata(snapshotResponseMap, snapshotName); assertNotNull(metadata); assertThat(metadata.get("policy"), equalTo(policyName)); - assertHistoryIsPresent(policyName, true, repoId); + assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION); } catch (ResponseException e) { fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); } @@ -278,7 +280,7 @@ public void testBasicTimeBasedRetenion() throws Exception { final Map metadata = extractMetadata(snapshotResponseMap, snapshotName); assertNotNull(metadata); assertThat(metadata.get("policy"), equalTo(policyName)); - assertHistoryIsPresent(policyName, true, repoId); + assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION); } catch (ResponseException e) { fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); } @@ -306,6 +308,7 @@ public void testBasicTimeBasedRetenion() throws Exception { } catch (ResponseException e) { assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception")); } + assertHistoryIsPresent(policyName, true, repoId, DELETE_OPERATION); Map stats = getSLMStats(); Map policyStats = (Map) stats.get(SnapshotLifecycleStats.POLICY_STATS.getPreferredName()); @@ -414,7 +417,7 @@ private Map getSLMStats() { } // This method should be called inside an assertBusy, it has no retry logic of its own - private void assertHistoryIsPresent(String policyName, boolean success, String repository) throws IOException { + private void assertHistoryIsPresent(String policyName, boolean success, String repository, String operation) throws IOException { final Request historySearchRequest = new Request("GET", ".slm-history*/_search"); historySearchRequest.setJsonEntity("{\n" + " \"query\": {\n" + @@ -437,7 +440,7 @@ private void assertHistoryIsPresent(String policyName, boolean success, String r " },\n" + " {\n" + " \"term\": {\n" + - " \"operation\": \"CREATE\"\n" + + " \"operation\": \"" + operation + "\"\n" + " }\n" + " }\n" + " ]\n" + diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index 99dcdfc2a7b17..415e0f2ae4582 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -161,7 +161,7 @@ public Collection createComponents(Client client, ClusterService cluster snapshotLifecycleService.set(new SnapshotLifecycleService(settings, () -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock())); snapshotRetentionService.set(new SnapshotRetentionService(settings, - () -> new SnapshotRetentionTask(client, clusterService, System::nanoTime), + () -> new SnapshotRetentionTask(client, clusterService, System::nanoTime, snapshotHistoryStore.get()), clusterService, getClock())); return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get(), snapshotRetentionService.get()); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java index 10898493fb8e1..993dac40252e6 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java @@ -94,7 +94,8 @@ public void onResponse(CreateSnapshotResponse createSnapshotResponse) { final long timestamp = Instant.now().toEpochMilli(); clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(), WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp)); - historyStore.putAsync(SnapshotHistoryItem.successRecord(timestamp, policyMetadata.getPolicy(), request.snapshot())); + historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(), + request.snapshot())); } @Override @@ -106,7 +107,8 @@ public void onFailure(Exception e) { WriteJobStatus.failure(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp, e)); final SnapshotHistoryItem failureRecord; try { - failureRecord = SnapshotHistoryItem.failureRecord(timestamp, policyMetadata.getPolicy(), request.snapshot(), e); + failureRecord = SnapshotHistoryItem.creationFailureRecord(timestamp, policyMetadata.getPolicy(), + request.snapshot(), e); historyStore.putAsync(failureRecord); } catch (IOException ex) { // This shouldn't happen unless there's an issue with serializing the original exception, which shouldn't happen diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index 0d3f70851e402..fa292e14fc8f8 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -28,7 +28,11 @@ import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration; +import org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem; +import org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore; +import java.io.IOException; +import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -38,10 +42,13 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy.POLICY_ID_METADATA_FIELD; + /** * The {@code SnapshotRetentionTask} is invoked by the scheduled job from the * {@link SnapshotRetentionService}. It is responsible for retrieving the snapshots for repositories @@ -56,11 +63,14 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener { private final Client client; private final ClusterService clusterService; private final LongSupplier nowNanoSupplier; + private final SnapshotHistoryStore historyStore; - public SnapshotRetentionTask(Client client, ClusterService clusterService, LongSupplier nowNanoSupplier) { + public SnapshotRetentionTask(Client client, ClusterService clusterService, LongSupplier nowNanoSupplier, + SnapshotHistoryStore historyStore) { this.client = new OriginSettingClient(client, ClientHelper.INDEX_LIFECYCLE_ORIGIN); this.clusterService = clusterService; this.nowNanoSupplier = nowNanoSupplier; + this.historyStore = historyStore; } @Override @@ -157,7 +167,7 @@ static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map Optional.ofNullable(info.userMetadata()) - .map(meta -> meta.get(SnapshotLifecyclePolicy.POLICY_ID_METADATA_FIELD)) + .map(meta -> meta.get(POLICY_ID_METADATA_FIELD)) .map(pId -> pId.equals(policyId)) .orElse(false)) .collect(Collectors.toList())) @@ -247,21 +257,44 @@ void deleteSnapshots(Map> snapshotsToDelete, logger.info("starting snapshot retention deletion for [{}] snapshots", count); long startTime = nowNanoSupplier.getAsLong(); - int deleted = 0; + final AtomicInteger deleted = new AtomicInteger(0); + final AtomicInteger failed = new AtomicInteger(0); for (Map.Entry> entry : snapshotsToDelete.entrySet()) { String repo = entry.getKey(); List snapshots = entry.getValue(); for (SnapshotInfo info : snapshots) { - deleteSnapshot(getPolicyId(info), repo, info.snapshotId(), slmStats); - deleted++; + final String policyId = getPolicyId(info); + deleteSnapshot(policyId, repo, info.snapshotId(), slmStats, ActionListener.wrap(acknowledgedResponse -> { + deleted.incrementAndGet(); + if (acknowledgedResponse.isAcknowledged()) { + historyStore.putAsync(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), + info.snapshotId().getName(), policyId, repo)); + } else { + SnapshotHistoryItem.deletionPossibleSuccessRecord(Instant.now().toEpochMilli(), + info.snapshotId().getName(), policyId, repo, + "deletion request issued successfully, no acknowledgement received"); + } + }, e -> { + failed.incrementAndGet(); + try { + final SnapshotHistoryItem result = SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), + info.snapshotId().getName(), policyId, repo, e); + historyStore.putAsync(result); + } catch (IOException ex) { + // This shouldn't happen unless there's an issue with serializing the original exception + logger.error(new ParameterizedMessage( + "failed to record snapshot deletion failure for snapshot lifecycle policy [{}]", + policyId), ex); + } + })); // Check whether we have exceeded the maximum time allowed to spend deleting // snapshots, if we have, short-circuit the rest of the deletions TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime); logger.trace("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), elapsedDeletionTime); if (elapsedDeletionTime.compareTo(maximumTime) > 0) { logger.info("maximum snapshot retention deletion time reached, time spent: [{}]," + - " maximum allowed time: [{}], deleted {} out of {} snapshots scheduled for deletion", - elapsedDeletionTime, maximumTime, deleted, count); + " maximum allowed time: [{}], deleted [{}] out of [{}] snapshots scheduled for deletion, failed to delete [{}]", + elapsedDeletionTime, maximumTime, deleted, count, failed); slmStats.deletionTime(elapsedDeletionTime); slmStats.retentionTimedOut(); return; @@ -275,8 +308,14 @@ void deleteSnapshots(Map> snapshotsToDelete, /** * Delete the given snapshot from the repository in blocking manner + * + * @param repo The repository the snapshot is in + * @param snapshot The snapshot metadata + * @param listener {@link ActionListener#onResponse(Object)} is called if a {@link SnapshotHistoryItem} can be created representing a + * successful or failed deletion call. {@link ActionListener#onFailure(Exception)} is called only if interrupted. */ - void deleteSnapshot(String slmPolicy, String repo, SnapshotId snapshot, SnapshotLifecycleStats slmStats) { + void deleteSnapshot(String slmPolicy, String repo, SnapshotId snapshot, SnapshotLifecycleStats slmStats, + ActionListener listener) { logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, snapshot); CountDownLatch latch = new CountDownLatch(1); client.admin().cluster().prepareDeleteSnapshot(repo, snapshot.getName()) @@ -288,6 +327,7 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) { } else { logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", repo, snapshot); } + listener.onResponse(acknowledgedResponse); slmStats.snapshotDeleted(slmPolicy); } @@ -296,6 +336,7 @@ public void onFailure(Exception e) { logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention", repo, snapshot), e); slmStats.snapshotDeleteFailure(slmPolicy); + listener.onFailure(e); } }, latch)); try { @@ -305,6 +346,7 @@ public void onFailure(Exception e) { } catch (InterruptedException e) { logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted", repo, snapshot), e); + listener.onFailure(e); slmStats.snapshotDeleteFailure(slmPolicy); } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java index 257dcb518662f..ff42f48961f3b 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore; import org.elasticsearch.xpack.core.watcher.watch.ClockMock; import java.util.Collections; @@ -67,7 +68,7 @@ public void testJobsAreScheduled() { private static class FakeRetentionTask extends SnapshotRetentionTask { FakeRetentionTask() { - super(mock(Client.class), null, System::nanoTime); + super(mock(Client.class), null, System::nanoTime, mock(SnapshotHistoryStore.class)); } @Override diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index b137473551bac..86e7a34214bbb 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.slm; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -28,7 +29,9 @@ import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata; import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration; +import org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -40,16 +43,20 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.DELETE_OPERATION; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; public class SnapshotRetentionTaskTests extends ESTestCase { @@ -128,60 +135,105 @@ public void testSnapshotEligibleForDeletion() { assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); } - public void testRetentionTask() throws Exception { + public void testRetentionTaskSuccess() throws Exception { + retentionTaskTest(true); + } + + public void testRetentionTaskFailure() throws Exception { + retentionTaskTest(false); + } + + private void retentionTaskTest(final boolean deletionSuccess) throws Exception { try (ThreadPool threadPool = new TestThreadPool("slm-test"); ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); Client noOpClient = new NoOpClient("slm-test")) { - SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy("policy", "snap", "1 * * * * ?", - "repo", null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30), null, null)); + final String policyId = "policy"; + final String repoId = "repo"; + SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyId, "snap", "1 * * * * ?", + repoId, null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30), null, null)); ClusterState state = createState(policy); ClusterServiceUtils.setState(clusterService, state); final SnapshotInfo eligibleSnapshot = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), - 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo ineligibleSnapshot = new SnapshotInfo(new SnapshotId("name2", "uuid2"), Collections.singletonList("index"), System.currentTimeMillis(), null, System.currentTimeMillis() + 1, 1, - Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); + + Set deleted = ConcurrentHashMap.newKeySet(); + Set deletedSnapshotsInHistory = ConcurrentHashMap.newKeySet(); + CountDownLatch deletionLatch = new CountDownLatch(1); + CountDownLatch historyLatch = new CountDownLatch(1); - AtomicReference> deleted = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); MockSnapshotRetentionTask retentionTask = new MockSnapshotRetentionTask(noOpClient, clusterService, + new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, + (historyItem) -> { + assertEquals(deletionSuccess, historyItem.isSuccess()); + if (historyItem.isSuccess() == false) { + assertThat(historyItem.getErrorDetails(), containsString("deletion_failed")); + } + assertEquals(policyId, historyItem.getPolicyId()); + assertEquals(repoId, historyItem.getRepository()); + assertEquals(DELETE_OPERATION, historyItem.getOperation()); + deletedSnapshotsInHistory.add(historyItem.getSnapshotName()); + historyLatch.countDown(); + }), () -> { List snaps = new ArrayList<>(2); snaps.add(eligibleSnapshot); snaps.add(ineligibleSnapshot); logger.info("--> retrieving snapshots [{}]", snaps); - return Collections.singletonMap("repo", snaps); + return Collections.singletonMap(repoId, snaps); + }, + (deletionPolicyId, repo, snapId, slmStats, listener) -> { + logger.info("--> deleting {} from repo {}", snapId, repo); + deleted.add(snapId); + if (deletionSuccess) { + listener.onResponse(new AcknowledgedResponse(true)); + } else { + listener.onFailure(new RuntimeException("deletion_failed")); + } + deletionLatch.countDown(); }, - snapsToDelete -> { - logger.info("--> deleting {}", snapsToDelete); - deleted.set(snapsToDelete.values().stream().flatMap(Collection::stream).collect(Collectors.toList())); - latch.countDown(); - }); + System::nanoTime); long time = System.currentTimeMillis(); retentionTask.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time)); - latch.await(10, TimeUnit.SECONDS); + deletionLatch.await(10, TimeUnit.SECONDS); + + assertThat("something should have been deleted", deleted, not(empty())); + assertThat("one snapshot should have been deleted", deleted, hasSize(1)); + assertThat(deleted, contains(eligibleSnapshot.snapshotId())); - assertNotNull("something should have been deleted", deleted.get()); - assertThat("one snapshot should have been deleted", deleted.get().size(), equalTo(1)); - assertThat(deleted.get().get(0), equalTo(eligibleSnapshot)); + boolean historySuccess = historyLatch.await(10, TimeUnit.SECONDS); + assertThat("expected history entries for 1 snapshot deletions", historySuccess, equalTo(true)); + assertThat(deletedSnapshotsInHistory, contains(eligibleSnapshot.snapshotId().getName())); threadPool.shutdownNow(); threadPool.awaitTermination(10, TimeUnit.SECONDS); } } - public void testTimeBoundedDeletion() throws Exception { + public void testSuccessfulTimeBoundedDeletion() throws Exception { + timeBoundedDeletion(true); + } + + public void testFailureTimeBoundedDeletion() throws Exception { + timeBoundedDeletion(false); + } + + private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception { try (ThreadPool threadPool = new TestThreadPool("slm-test"); ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); Client noOpClient = new NoOpClient("slm-test")) { - SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy("policy", "snap", "1 * * * * ?", - "repo", null, new SnapshotRetentionConfiguration(null, null,1)); + final String policyId = "policy"; + final String repoId = "repo"; + SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyId, "snap", "1 * * * * ?", + repoId, null, new SnapshotRetentionConfiguration(null, null, 1)); ClusterState state = createState(policy); state = ClusterState.builder(state) @@ -192,42 +244,61 @@ public void testTimeBoundedDeletion() throws Exception { ClusterServiceUtils.setState(clusterService, state); final SnapshotInfo snap1 = new SnapshotInfo(new SnapshotId("name1", "uuid1"), Collections.singletonList("index"), - 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo snap2 = new SnapshotInfo(new SnapshotId("name2", "uuid2"), Collections.singletonList("index"), - 1L, null, 2L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + 1L, null, 2L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo snap3 = new SnapshotInfo(new SnapshotId("name3", "uuid3"), Collections.singletonList("index"), - 2L, null, 3L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + 2L, null, 3L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo snap4 = new SnapshotInfo(new SnapshotId("name4", "uuid4"), Collections.singletonList("index"), - 3L, null, 4L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + 3L, null, 4L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo snap5 = new SnapshotInfo(new SnapshotId("name5", "uuid5"), Collections.singletonList("index"), - 4L, null, 5L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + 4L, null, 5L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final Set deleted = ConcurrentHashMap.newKeySet(); // We're expected two deletions before they hit the "taken too long" test, so have a latch of 2 - CountDownLatch latch = new CountDownLatch(2); + CountDownLatch deletionLatch = new CountDownLatch(2); + CountDownLatch historyLatch = new CountDownLatch(2); + Set deletedSnapshotsInHistory = ConcurrentHashMap.newKeySet(); AtomicLong nanos = new AtomicLong(System.nanoTime()); - OverrideDeleteSnapshotRetentionTask retentionTask = new OverrideDeleteSnapshotRetentionTask(noOpClient, clusterService, + MockSnapshotRetentionTask retentionTask = new MockSnapshotRetentionTask(noOpClient, clusterService, + new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, + (historyItem) -> { + assertEquals(deletionSuccess, historyItem.isSuccess()); + if (historyItem.isSuccess() == false) { + assertThat(historyItem.getErrorDetails(), containsString("deletion_failed")); + } + assertEquals(policyId, historyItem.getPolicyId()); + assertEquals(repoId, historyItem.getRepository()); + assertEquals(DELETE_OPERATION, historyItem.getOperation()); + deletedSnapshotsInHistory.add(historyItem.getSnapshotName()); + historyLatch.countDown(); + }), () -> { List snaps = Arrays.asList(snap1, snap2, snap3, snap4, snap5); logger.info("--> retrieving snapshots [{}]", snaps); - return Collections.singletonMap("repo", snaps); + return Collections.singletonMap(repoId, snaps); }, - (repo, snapshotId) -> { - logger.info("--> deleting {}", snapshotId); + (deletionPolicyId, repo, snapId, slmStats, listener) -> { + logger.info("--> deleting {}", snapId); // Don't pause until snapshot 2 - if (snapshotId.equals(snap2.snapshotId())) { + if (snapId.equals(snap2.snapshotId())) { logger.info("--> pausing for 501ms while deleting snap2 to simulate deletion past a threshold"); nanos.addAndGet(TimeValue.timeValueMillis(501).nanos()); } - deleted.add(snapshotId); - latch.countDown(); + deleted.add(snapId); + if (deletionSuccess) { + listener.onResponse(new AcknowledgedResponse(true)); + } else { + listener.onFailure(new RuntimeException("deletion_failed")); + } + deletionLatch.countDown(); }, nanos::get); long time = System.currentTimeMillis(); retentionTask.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time)); - boolean success = latch.await(10, TimeUnit.SECONDS); + boolean success = deletionLatch.await(10, TimeUnit.SECONDS); assertThat("expected 2 snapshot deletions within 10 seconds, deleted: " + deleted, success, equalTo(true)); @@ -235,6 +306,10 @@ public void testTimeBoundedDeletion() throws Exception { assertThat("two snapshots should have been deleted", deleted.size(), equalTo(2)); assertThat(deleted, containsInAnyOrder(snap1.snapshotId(), snap2.snapshotId())); + boolean historySuccess = historyLatch.await(10, TimeUnit.SECONDS); + assertThat("expected history entries for 2 snapshot deletions", historySuccess, equalTo(true)); + assertThat(deletedSnapshotsInHistory, containsInAnyOrder(snap1.snapshotId().getName(), snap2.snapshotId().getName())); + threadPool.shutdownNow(); threadPool.awaitTermination(10, TimeUnit.SECONDS); } @@ -261,17 +336,18 @@ public ClusterState createState(SnapshotLifecyclePolicy... policies) { } private static class MockSnapshotRetentionTask extends SnapshotRetentionTask { - private final Supplier>> snapshotRetriever; - private final Consumer>> snapshotDeleter; + private final DeleteSnapshotMock deleteRunner; MockSnapshotRetentionTask(Client client, ClusterService clusterService, + SnapshotHistoryStore historyStore, Supplier>> snapshotRetriever, - Consumer>> snapshotDeleter) { - super(client, clusterService, System::nanoTime); + DeleteSnapshotMock deleteRunner, + LongSupplier nanoSupplier) { + super(client, clusterService, nanoSupplier, historyStore); this.snapshotRetriever = snapshotRetriever; - this.snapshotDeleter = snapshotDeleter; + this.deleteRunner = deleteRunner; } @Override @@ -282,35 +358,15 @@ void getAllSuccessfulSnapshots(Collection repositories, } @Override - void deleteSnapshots(Map> snapshotsToDelete, TimeValue maxDeleteTime, SnapshotLifecycleStats slmStats) { - this.snapshotDeleter.accept(snapshotsToDelete); + void deleteSnapshot(String policyId, String repo, SnapshotId snapshot, SnapshotLifecycleStats slmStats, + ActionListener listener) { + deleteRunner.apply(policyId, repo, snapshot, slmStats, listener); } } - private static class OverrideDeleteSnapshotRetentionTask extends SnapshotRetentionTask { - private final Supplier>> snapshotRetriever; - private final BiConsumer deleteRunner; - - OverrideDeleteSnapshotRetentionTask(Client client, - ClusterService clusterService, - Supplier>> snapshotRetriever, - BiConsumer deleteRunner, - LongSupplier nanoSupplier) { - super(client, clusterService, nanoSupplier); - this.snapshotRetriever = snapshotRetriever; - this.deleteRunner = deleteRunner; - } - - @Override - void getAllSuccessfulSnapshots(Collection repositories, - ActionListener>> listener, - Consumer errorHandler) { - listener.onResponse(this.snapshotRetriever.get()); - } - - @Override - void deleteSnapshot(String policyId, String repo, SnapshotId snapshot, SnapshotLifecycleStats slmStats) { - deleteRunner.accept(repo, snapshot); - } + @FunctionalInterface + interface DeleteSnapshotMock { + void apply(String policyId, String repo, SnapshotId snapshot, SnapshotLifecycleStats slmStats, + ActionListener listener); } }