From c2fb4b3648d64ccd50d239967035a269259986d9 Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Fri, 26 Jul 2019 17:08:09 -0600 Subject: [PATCH 01/11] Record history of SLM retention actions This commit records the deletion of snapshots by the retention component of SLM into the SLM history index for the purposes of alerting and alerting. --- .../history/SnapshotHistoryItem.java | 41 +++++++++++++------ .../history/SnapshotHistoryStoreTests.java | 6 +-- .../xpack/ilm/IndexLifecycle.java | 4 +- .../xpack/slm/SnapshotLifecycleTask.java | 6 ++- .../xpack/slm/SnapshotRetentionTask.java | 20 ++++++++- .../slm/SnapshotRetentionServiceTests.java | 3 +- .../xpack/slm/SnapshotRetentionTaskTests.java | 6 ++- 7 files changed, 64 insertions(+), 22 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryItem.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryItem.java index 8120be9683fb7..8666f3062f10f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryItem.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryItem.java @@ -15,7 +15,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -41,6 +40,8 @@ public class SnapshotHistoryItem implements Writeable, ToXContentObject { static final ParseField OPERATION = new ParseField("operation"); static final ParseField SUCCESS = new ParseField("success"); private static final String CREATE_OPERATION = "CREATE"; + private static final String DELETE_OPERATION = "DELETE"; + protected final long timestamp; protected final String policyId; protected final String repository; @@ -98,25 +99,29 @@ public static SnapshotHistoryItem parse(XContentParser parser, String name) { this.errorDetails = errorDetails; } - public static SnapshotHistoryItem successRecord(long timestamp, SnapshotLifecyclePolicy policy, String snapshotName) { + public static SnapshotHistoryItem creationSuccessRecord(long timestamp, SnapshotLifecyclePolicy policy, String snapshotName) { return new SnapshotHistoryItem(timestamp, policy.getId(), policy.getRepository(), snapshotName, CREATE_OPERATION, true, policy.getConfig(), null); } - public static SnapshotHistoryItem failureRecord(long timeStamp, SnapshotLifecyclePolicy policy, String snapshotName, - Exception exception) throws IOException { - ToXContent.Params stacktraceParams = new ToXContent.MapParams(Collections.singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false")); - String exceptionString; - try (XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder()) { - causeXContentBuilder.startObject(); - ElasticsearchException.generateThrowableXContent(causeXContentBuilder, stacktraceParams, exception); - causeXContentBuilder.endObject(); - exceptionString = BytesReference.bytes(causeXContentBuilder).utf8ToString(); - } + public static SnapshotHistoryItem creationFailureRecord(long timeStamp, SnapshotLifecyclePolicy policy, String snapshotName, + Exception exception) throws IOException { + String exceptionString = exceptionToString(exception); return new SnapshotHistoryItem(timeStamp, policy.getId(), policy.getRepository(), snapshotName, CREATE_OPERATION, false, policy.getConfig(), exceptionString); } + public static SnapshotHistoryItem deletionSuccessRecord(long timestamp, String snapshotName, String policyId, String repository) { + return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, true, null, null); + } + + public static SnapshotHistoryItem deletionFailureRecord(long timestamp, String snapshotName, String policyId, String repository, + Exception exception) throws IOException { + String exceptionString = exceptionToString(exception); + return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, CREATE_OPERATION, false, + null, exceptionString); + } + public SnapshotHistoryItem(StreamInput in) throws IOException { this.timestamp = in.readVLong(); this.policyId = in.readString(); @@ -220,4 +225,16 @@ public int hashCode() { public String toString() { return Strings.toString(this); } + + private static String exceptionToString(Exception exception) throws IOException { + Params stacktraceParams = new MapParams(Collections.singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false")); + String exceptionString; + try (XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder()) { + causeXContentBuilder.startObject(); + ElasticsearchException.generateThrowableXContent(causeXContentBuilder, stacktraceParams, exception); + causeXContentBuilder.endObject(); + exceptionString = BytesReference.bytes(causeXContentBuilder).utf8ToString(); + } + return exceptionString; + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryStoreTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryStoreTests.java index 8324741198159..9ee83c23b555c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryStoreTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryStoreTests.java @@ -59,7 +59,7 @@ public void testNoActionIfDisabled() { final long timestamp = randomNonNegativeLong(); SnapshotLifecyclePolicy.ResolverContext context = new SnapshotLifecyclePolicy.ResolverContext(timestamp); String snapshotId = policy.generateSnapshotName(context); - SnapshotHistoryItem record = SnapshotHistoryItem.successRecord(timestamp, policy, snapshotId); + SnapshotHistoryItem record = SnapshotHistoryItem.creationSuccessRecord(timestamp, policy, snapshotId); client.setVerifier((a,r,l) -> { fail("the history store is disabled, no action should have been taken"); @@ -76,7 +76,7 @@ public void testPut() throws Exception { SnapshotLifecyclePolicy.ResolverContext context = new SnapshotLifecyclePolicy.ResolverContext(timestamp); String snapshotId = policy.generateSnapshotName(context); { - SnapshotHistoryItem record = SnapshotHistoryItem.successRecord(timestamp, policy, snapshotId); + SnapshotHistoryItem record = SnapshotHistoryItem.creationSuccessRecord(timestamp, policy, snapshotId); AtomicInteger calledTimes = new AtomicInteger(0); client.setVerifier((action, request, listener) -> { @@ -111,7 +111,7 @@ public void testPut() throws Exception { { final String cause = randomAlphaOfLength(9); Exception failureException = new RuntimeException(cause); - SnapshotHistoryItem record = SnapshotHistoryItem.failureRecord(timestamp, policy, snapshotId, failureException); + SnapshotHistoryItem record = SnapshotHistoryItem.creationFailureRecord(timestamp, policy, snapshotId, failureException); AtomicInteger calledTimes = new AtomicInteger(0); client.setVerifier((action, request, listener) -> { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index 66faade884057..312c89db00c58 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -154,8 +154,8 @@ public Collection createComponents(Client client, ClusterService cluster snapshotHistoryStore.set(new SnapshotHistoryStore(settings, client, getClock().getZone())); snapshotLifecycleService.set(new SnapshotLifecycleService(settings, () -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock())); - snapshotRetentionService.set(new SnapshotRetentionService(settings, () -> new SnapshotRetentionTask(client, clusterService), - clusterService, getClock())); + snapshotRetentionService.set(new SnapshotRetentionService(settings, + () -> new SnapshotRetentionTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock())); return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get(), snapshotRetentionService.get()); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java index b15a5d46145c1..60ee0b32049d8 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java @@ -94,7 +94,8 @@ public void onResponse(CreateSnapshotResponse createSnapshotResponse) { final long timestamp = Instant.now().toEpochMilli(); clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(), WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp)); - historyStore.putAsync(SnapshotHistoryItem.successRecord(timestamp, policyMetadata.getPolicy(), request.snapshot())); + historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(), + request.snapshot())); } @Override @@ -106,7 +107,8 @@ public void onFailure(Exception e) { WriteJobStatus.failure(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp, e)); final SnapshotHistoryItem failureRecord; try { - failureRecord = SnapshotHistoryItem.failureRecord(timestamp, policyMetadata.getPolicy(), request.snapshot(), e); + failureRecord = SnapshotHistoryItem.creationFailureRecord(timestamp, policyMetadata.getPolicy(), + request.snapshot(), e); historyStore.putAsync(failureRecord); } catch (IOException ex) { // This shouldn't happen unless there's an issue with serializing the original exception, which shouldn't happen diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index b71366feea60e..20abe52b89aff 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -24,7 +24,11 @@ import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotRetentionConfiguration; +import org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotHistoryItem; +import org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotHistoryStore; +import java.io.IOException; +import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -50,10 +54,12 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener { private final Client client; private final ClusterService clusterService; + private final SnapshotHistoryStore historyStore; - public SnapshotRetentionTask(Client client, ClusterService clusterService) { + public SnapshotRetentionTask(Client client, ClusterService clusterService, SnapshotHistoryStore historyStore) { this.client = new OriginSettingClient(client, ClientHelper.INDEX_LIFECYCLE_ORIGIN); this.clusterService = clusterService; + this.historyStore = historyStore; } @Override @@ -200,12 +206,15 @@ void deleteSnapshots(Map> snapshotsToDelete) { snapshots.forEach(info -> { logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, info.snapshotId()); CountDownLatch latch = new CountDownLatch(1); + String policyId = (String) info.userMetadata().get("policy"); // TODO NOCOMMIT: Make this less fragile client.admin().cluster().prepareDeleteSnapshot(repo, info.snapshotId().getName()) .execute(new LatchedActionListener<>(new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { if (acknowledgedResponse.isAcknowledged()) { logger.debug("[{}] snapshot [{}] deleted successfully", repo, info.snapshotId()); + historyStore.putAsync(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), + info.snapshotId().getName(), policyId, repo)); } } @@ -213,6 +222,15 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) { public void onFailure(Exception e) { logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention", repo, info.snapshotId()), e); + try { + historyStore.putAsync(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), + info.snapshotId().getName(), policyId, repo, e)); + } catch (IOException ex) { + // This shouldn't happen unless there's an issue with serializing the original exception + logger.error(new ParameterizedMessage( + "failed to record snapshot creation failure for snapshot lifecycle policy [{}]", + policyId), e); + } } }, latch)); try { diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java index 04a763224f31b..c3b0e9e1881c0 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotHistoryStore; import org.elasticsearch.xpack.core.watcher.watch.ClockMock; import java.util.Collections; @@ -67,7 +68,7 @@ public void testJobsAreScheduled() { private static class FakeRetentionTask extends SnapshotRetentionTask { FakeRetentionTask() { - super(mock(Client.class), null); + super(mock(Client.class), null, mock(SnapshotHistoryStore.class)); } @Override diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index fb560798907d6..6c1ee9cebd443 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicyMetadata; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotRetentionConfiguration; +import org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotHistoryStore; import java.util.ArrayList; import java.util.Arrays; @@ -139,6 +140,8 @@ public void testRetentionTask() throws Exception { AtomicReference> deleted = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); MockSnapshotRetentionTask retentionTask = new MockSnapshotRetentionTask(noOpClient, clusterService, + null,//new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, + // (historyItem) -> fail("actually test this")), () -> { List snaps = new ArrayList<>(2); snaps.add(eligibleSnapshot); @@ -191,9 +194,10 @@ private class MockSnapshotRetentionTask extends SnapshotRetentionTask { MockSnapshotRetentionTask(Client client, ClusterService clusterService, + SnapshotHistoryStore historyStore, Supplier>> snapshotRetriever, Consumer>> snapshotDeleter) { - super(client, clusterService); + super(client, clusterService, historyStore); this.snapshotRetriever = snapshotRetriever; this.snapshotDeleter = snapshotDeleter; } From 14aa8fa652ffc71bb4a1d3533c913442aa4a4a10 Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Mon, 12 Aug 2019 12:04:35 -0600 Subject: [PATCH 02/11] Use constant and fix formatting --- .../xpack/slm/SnapshotRetentionTask.java | 50 ++++++++++--------- .../xpack/slm/SnapshotRetentionTaskTests.java | 2 +- 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index 9ac1f099fa114..e2ddf3a892915 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -45,6 +45,8 @@ import java.util.function.LongSupplier; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy.POLICY_ID_METADATA_FIELD; + /** * The {@code SnapshotRetentionTask} is invoked by the scheduled job from the * {@link SnapshotRetentionService}. It is responsible for retrieving the snapshots for repositories @@ -138,7 +140,7 @@ static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map Optional.ofNullable(info.userMetadata()) - .map(meta -> meta.get(SnapshotLifecyclePolicy.POLICY_ID_METADATA_FIELD)) + .map(meta -> meta.get(POLICY_ID_METADATA_FIELD)) .map(pId -> pId.equals(policyId)) .orElse(false)) .collect(Collectors.toList())) @@ -244,15 +246,15 @@ void deleteSnapshots(Map> snapshotsToDelete, TimeValu void deleteSnapshot(String repo, SnapshotInfo info) { logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, info.snapshotId()); CountDownLatch latch = new CountDownLatch(1); - String policyId = (String) info.userMetadata().get("policy"); // TODO: Make this less fragile - client.admin().cluster().prepareDeleteSnapshot(repo, info.snapshotId().getName()) + String policyId = (String) info.userMetadata().get(POLICY_ID_METADATA_FIELD); + client.admin().cluster().prepareDeleteSnapshot(repo, info.snapshotId().getName()) .execute(new LatchedActionListener<>(new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { if (acknowledgedResponse.isAcknowledged()) { logger.debug("[{}] snapshot [{}] deleted successfully", repo, info.snapshotId()); - historyStore.putAsync(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), - info.snapshotId().getName(), policyId, repo)); + historyStore.putAsync(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), + info.snapshotId().getName(), policyId, repo)); } } @@ -260,24 +262,24 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) { public void onFailure(Exception e) { logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention", repo, info.snapshotId()), e); - try { - historyStore.putAsync(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), - info.snapshotId().getName(), policyId, repo, e)); - } catch (IOException ex) { - // This shouldn't happen unless there's an issue with serializing the original exception - logger.error(new ParameterizedMessage( - "failed to record snapshot creation failure for snapshot lifecycle policy [{}]", - policyId), e); - } - } - }, latch)); - try { - // Deletes cannot occur simultaneously, so wait for this - // deletion to complete before attempting the next one - latch.await(); - } catch (InterruptedException e) { - logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted", - repo, info.snapshotId()), e); + try { + historyStore.putAsync(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), + info.snapshotId().getName(), policyId, repo, e)); + } catch (IOException ex) { + // This shouldn't happen unless there's an issue with serializing the original exception + logger.error(new ParameterizedMessage( + "failed to record snapshot creation failure for snapshot lifecycle policy [{}]", + policyId), e); + } + } + }, latch)); + try { + // Deletes cannot occur simultaneously, so wait for this + // deletion to complete before attempting the next one + latch.await(); + } catch (InterruptedException e) { + logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted", + repo, info.snapshotId()), e); } } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index 7b3e70c4e2150..cfa876236f119 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -149,7 +149,7 @@ public void testRetentionTask() throws Exception { CountDownLatch latch = new CountDownLatch(1); MockSnapshotRetentionTask retentionTask = new MockSnapshotRetentionTask(noOpClient, clusterService, null,//new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, - // (historyItem) -> fail("actually test this")), + // (historyItem) -> fail("actually test this")), //TODO: fix this () -> { List snaps = new ArrayList<>(2); snaps.add(eligibleSnapshot); From 31f6c2676cb39d91478b810c37c75bc80357b87b Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Tue, 13 Aug 2019 14:42:56 -0600 Subject: [PATCH 03/11] Tests & fixes for issues that caused failures --- .../core/slm/history/SnapshotHistoryItem.java | 7 +- .../xpack/slm/SnapshotLifecycleIT.java | 19 +- .../xpack/slm/SnapshotRetentionTask.java | 28 ++- .../xpack/slm/SnapshotRetentionTaskTests.java | 207 ++++++++++++------ 4 files changed, 171 insertions(+), 90 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java index 4d70f5376b2e8..6a3836d065c30 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java @@ -39,8 +39,9 @@ public class SnapshotHistoryItem implements Writeable, ToXContentObject { static final ParseField SNAPSHOT_NAME = new ParseField("snapshot_name"); static final ParseField OPERATION = new ParseField("operation"); static final ParseField SUCCESS = new ParseField("success"); - private static final String CREATE_OPERATION = "CREATE"; - private static final String DELETE_OPERATION = "DELETE"; + + public static final String CREATE_OPERATION = "CREATE"; + public static final String DELETE_OPERATION = "DELETE"; protected final long timestamp; protected final String policyId; @@ -118,7 +119,7 @@ public static SnapshotHistoryItem deletionSuccessRecord(long timestamp, String s public static SnapshotHistoryItem deletionFailureRecord(long timestamp, String snapshotName, String policyId, String repository, Exception exception) throws IOException { String exceptionString = exceptionToString(exception); - return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, CREATE_OPERATION, false, + return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, false, null, exceptionString); } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java index 6b6a340c6b008..764cacfae75d7 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java @@ -36,10 +36,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.Optional; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.CREATE_OPERATION; +import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.DELETE_OPERATION; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -98,7 +100,7 @@ public void testFullPolicySnapshot() throws Exception { Map metadata = (Map) snapResponse.get(0).get("metadata"); assertNotNull(metadata); assertThat(metadata.get("policy"), equalTo(policyName)); - assertHistoryIsPresent(policyName, true, repoId); + assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION); // Check that the last success date was written to the cluster state Request getReq = new Request("GET", "/_slm/policy/" + policyName); @@ -119,7 +121,7 @@ public void testFullPolicySnapshot() throws Exception { String lastSnapshotName = (String) lastSuccessObject.get("snapshot_name"); assertThat(lastSnapshotName, startsWith("snap-")); - assertHistoryIsPresent(policyName, true, repoId); + assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION); }); Request delReq = new Request("DELETE", "/_slm/policy/" + policyName); @@ -170,7 +172,7 @@ public void testPolicyFailure() throws Exception { assertNotNull(snapshotName); assertThat(snapshotName, startsWith("snap-")); } - assertHistoryIsPresent(policyName, false, repoName); + assertHistoryIsPresent(policyName, false, repoName, CREATE_OPERATION); }); Request delReq = new Request("DELETE", "/_slm/policy/" + policyName); @@ -214,7 +216,7 @@ public void testPolicyManualExecution() throws Exception { final Map metadata = extractMetadata(snapshotResponseMap, snapshotName); assertNotNull(metadata); assertThat(metadata.get("policy"), equalTo(policyName)); - assertHistoryIsPresent(policyName, true, repoId); + assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION); } catch (ResponseException e) { fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); } @@ -272,7 +274,7 @@ public void testBasicTimeBasedRetenion() throws Exception { final Map metadata = extractMetadata(snapshotResponseMap, snapshotName); assertNotNull(metadata); assertThat(metadata.get("policy"), equalTo(policyName)); - assertHistoryIsPresent(policyName, true, repoId); + assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION); } catch (ResponseException e) { fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); } @@ -300,6 +302,7 @@ public void testBasicTimeBasedRetenion() throws Exception { } catch (ResponseException e) { assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception")); } + assertHistoryIsPresent(policyName, true, repoId, DELETE_OPERATION); }, 60, TimeUnit.SECONDS); Request delReq = new Request("DELETE", "/_slm/policy/" + policyName); @@ -403,7 +406,7 @@ private static Map extractMetadata(Map snapshotR } // This method should be called inside an assertBusy, it has no retry logic of its own - private void assertHistoryIsPresent(String policyName, boolean success, String repository) throws IOException { + private void assertHistoryIsPresent(String policyName, boolean success, String repository, String operation) throws IOException { final Request historySearchRequest = new Request("GET", ".slm-history*/_search"); historySearchRequest.setJsonEntity("{\n" + " \"query\": {\n" + @@ -426,7 +429,7 @@ private void assertHistoryIsPresent(String policyName, boolean success, String r " },\n" + " {\n" + " \"term\": {\n" + - " \"operation\": \"CREATE\"\n" + + " \"operation\": \"" + operation + "\"\n" + " }\n" + " }\n" + " ]\n" + diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index e2ddf3a892915..015b8b9c5b8e9 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -41,6 +41,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -220,20 +221,28 @@ void deleteSnapshots(Map> snapshotsToDelete, TimeValu logger.info("starting snapshot retention deletion for [{}] snapshots", count); long startTime = nowNanoSupplier.getAsLong(); int deleted = 0; + int failed = 0; for (Map.Entry> entry : snapshotsToDelete.entrySet()) { String repo = entry.getKey(); List snapshots = entry.getValue(); for (SnapshotInfo info : snapshots) { - deleteSnapshot(repo, info); - deleted++; + Optional result = deleteSnapshot(repo, info); + if (result.isPresent()) { + if (result.get().isSuccess()) { + deleted++; + } + historyStore.putAsync(result.get()); + } else { + failed++; + } // Check whether we have exceeded the maximum time allowed to spend deleting // snapshots, if we have, short-circuit the rest of the deletions TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime); logger.trace("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), elapsedDeletionTime); if (elapsedDeletionTime.compareTo(maximumTime) > 0) { logger.info("maximum snapshot retention deletion time reached, time spent: [{}]," + - " maximum allowed time: [{}], deleted {} out of {} snapshots scheduled for deletion", - elapsedDeletionTime, maximumTime, deleted, count); + " maximum allowed time: [{}], deleted [{}] out of [{}] snapshots scheduled for deletion, failed to delete [{}]", + elapsedDeletionTime, maximumTime, deleted, count, failed); return; } } @@ -242,18 +251,22 @@ void deleteSnapshots(Map> snapshotsToDelete, TimeValu /** * Delete the given snapshot from the repository in blocking manner + * @param repo The repository the snapshot is in + * @param info The snapshot metadata + * @return If present, a SnapshotHistoryItem containing the results of the deletion. Empty if no response or interrupted. */ - void deleteSnapshot(String repo, SnapshotInfo info) { + Optional deleteSnapshot(String repo, SnapshotInfo info) { logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, info.snapshotId()); CountDownLatch latch = new CountDownLatch(1); String policyId = (String) info.userMetadata().get(POLICY_ID_METADATA_FIELD); + AtomicReference result = new AtomicReference<>(); client.admin().cluster().prepareDeleteSnapshot(repo, info.snapshotId().getName()) .execute(new LatchedActionListener<>(new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { if (acknowledgedResponse.isAcknowledged()) { logger.debug("[{}] snapshot [{}] deleted successfully", repo, info.snapshotId()); - historyStore.putAsync(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), + result.set(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), info.snapshotId().getName(), policyId, repo)); } } @@ -263,7 +276,7 @@ public void onFailure(Exception e) { logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention", repo, info.snapshotId()), e); try { - historyStore.putAsync(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), + result.set(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), info.snapshotId().getName(), policyId, repo, e)); } catch (IOException ex) { // This shouldn't happen unless there's an issue with serializing the original exception @@ -281,5 +294,6 @@ public void onFailure(Exception e) { logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted", repo, info.snapshotId()), e); } + return Optional.ofNullable(result.get()); } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index cfa876236f119..5a00d3b5c6f6a 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -28,29 +28,39 @@ import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata; import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration; +import org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem; import org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore; +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.DELETE_OPERATION; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; public class SnapshotRetentionTaskTests extends ESTestCase { @@ -128,62 +138,113 @@ public void testSnapshotEligibleForDeletion() { assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); } - public void testRetentionTask() throws Exception { + public void testRentionTaskSuccess() throws Exception { + retentionTaskTest(true); + } + + public void testRentionTaskFailure() throws Exception { + retentionTaskTest(false); + } + + private void retentionTaskTest(final boolean deletionSuccess) throws Exception { try (ThreadPool threadPool = new TestThreadPool("slm-test"); ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); Client noOpClient = new NoOpClient("slm-test")) { - SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy("policy", "snap", "1 * * * * ?", - "repo", null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30), null, null)); + final String policyId = "policy"; + final String repoId = "repo"; + SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyId, "snap", "1 * * * * ?", + repoId, null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30), null, null)); ClusterState state = createState(policy); ClusterServiceUtils.setState(clusterService, state); final SnapshotInfo eligibleSnapshot = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), - 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo ineligibleSnapshot = new SnapshotInfo(new SnapshotId("name2", "uuid2"), Collections.singletonList("index"), System.currentTimeMillis(), null, System.currentTimeMillis() + 1, 1, - Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); + + Set deleted = ConcurrentHashMap.newKeySet(); + Set deletedSnapshotsInHistory = ConcurrentHashMap.newKeySet(); + CountDownLatch deletionLatch = new CountDownLatch(1); + CountDownLatch historyLatch = new CountDownLatch(1); - AtomicReference> deleted = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); MockSnapshotRetentionTask retentionTask = new MockSnapshotRetentionTask(noOpClient, clusterService, - null,//new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, - // (historyItem) -> fail("actually test this")), //TODO: fix this + new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, + (historyItem) -> { + assertEquals(deletionSuccess, historyItem.isSuccess()); + if (historyItem.isSuccess() == false) { + assertThat(historyItem.getErrorDetails(), containsString("deletion_failed")); + } + assertEquals(policyId, historyItem.getPolicyId()); + assertEquals(repoId, historyItem.getRepository()); + assertEquals(DELETE_OPERATION, historyItem.getOperation()); + deletedSnapshotsInHistory.add(historyItem.getSnapshotName()); + historyLatch.countDown(); + }), () -> { List snaps = new ArrayList<>(2); snaps.add(eligibleSnapshot); snaps.add(ineligibleSnapshot); logger.info("--> retrieving snapshots [{}]", snaps); - return Collections.singletonMap("repo", snaps); + return Collections.singletonMap(repoId, snaps); + }, + (repo, snapInfo) -> { + logger.info("--> deleting {} from repo {}", snapInfo, repo); + deleted.add(snapInfo); + deletionLatch.countDown(); + if (deletionSuccess) { + return Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), + snapInfo.snapshotId().getName(), policy.getId(), repo)); + } else { + try { + return Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), + snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed"))); + } catch (IOException e) { + logger.error(e); + fail("failed to serialize an exception to json, this should never happen"); + return Optional.empty(); // impossible to hit this but necessary to make the compiler happy + } + } }, - snapsToDelete -> { - logger.info("--> deleting {}", snapsToDelete); - deleted.set(snapsToDelete.values().stream().flatMap(Collection::stream).collect(Collectors.toList())); - latch.countDown(); - }); + System::nanoTime); long time = System.currentTimeMillis(); retentionTask.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time)); - latch.await(10, TimeUnit.SECONDS); + deletionLatch.await(10, TimeUnit.SECONDS); + + assertThat("something should have been deleted", deleted, not(empty())); + assertThat("one snapshot should have been deleted", deleted, hasSize(1)); + assertThat(deleted, contains(eligibleSnapshot)); - assertNotNull("something should have been deleted", deleted.get()); - assertThat("one snapshot should have been deleted", deleted.get().size(), equalTo(1)); - assertThat(deleted.get().get(0), equalTo(eligibleSnapshot)); + boolean historySuccess = historyLatch.await(10, TimeUnit.SECONDS); + assertThat("expected history entries for 1 snapshot deletions", historySuccess, equalTo(true)); + assertThat(deletedSnapshotsInHistory, contains(eligibleSnapshot.snapshotId().getName())); threadPool.shutdownNow(); threadPool.awaitTermination(10, TimeUnit.SECONDS); } } - public void testTimeBoundedDeletion() throws Exception { + public void testSuccessfulTimeBoundedDeletion() throws Exception { + timeBoundedDeletion(true); + } + + public void testFailureTimeBoundedDeletion() throws Exception { + timeBoundedDeletion(false); + } + + private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception { try (ThreadPool threadPool = new TestThreadPool("slm-test"); ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); Client noOpClient = new NoOpClient("slm-test")) { - SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy("policy", "snap", "1 * * * * ?", - "repo", null, new SnapshotRetentionConfiguration(null, null,1)); + final String policyId = "policy"; + final String repoId = "repo"; + SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyId, "snap", "1 * * * * ?", + repoId, null, new SnapshotRetentionConfiguration(null, null,1)); ClusterState state = createState(policy); state = ClusterState.builder(state) @@ -194,43 +255,69 @@ public void testTimeBoundedDeletion() throws Exception { ClusterServiceUtils.setState(clusterService, state); final SnapshotInfo snap1 = new SnapshotInfo(new SnapshotId("name1", "uuid1"), Collections.singletonList("index"), - 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo snap2 = new SnapshotInfo(new SnapshotId("name2", "uuid2"), Collections.singletonList("index"), - 1L, null, 2L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + 1L, null, 2L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo snap3 = new SnapshotInfo(new SnapshotId("name3", "uuid3"), Collections.singletonList("index"), - 2L, null, 3L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + 2L, null, 3L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo snap4 = new SnapshotInfo(new SnapshotId("name4", "uuid4"), Collections.singletonList("index"), - 3L, null, 4L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + 3L, null, 4L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo snap5 = new SnapshotInfo(new SnapshotId("name5", "uuid5"), Collections.singletonList("index"), - 4L, null, 5L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + 4L, null, 5L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final Set deleted = ConcurrentHashMap.newKeySet(); // We're expected two deletions before they hit the "taken too long" test, so have a latch of 2 - CountDownLatch latch = new CountDownLatch(2); + CountDownLatch deletionLatch = new CountDownLatch(2); + CountDownLatch historyLatch = new CountDownLatch(2); + Set deletedSnapshotsInHistory = ConcurrentHashMap.newKeySet(); AtomicLong nanos = new AtomicLong(System.nanoTime()); - OverrideDeleteSnapshotRetentionTask retentionTask = new OverrideDeleteSnapshotRetentionTask(noOpClient, clusterService, - null, + MockSnapshotRetentionTask retentionTask = new MockSnapshotRetentionTask(noOpClient, clusterService, + new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, + (historyItem) -> { + assertEquals(deletionSuccess, historyItem.isSuccess()); + if (historyItem.isSuccess() == false) { + assertThat(historyItem.getErrorDetails(), containsString("deletion_failed")); + } + assertEquals(policyId, historyItem.getPolicyId()); + assertEquals(repoId, historyItem.getRepository()); + assertEquals(DELETE_OPERATION, historyItem.getOperation()); + deletedSnapshotsInHistory.add(historyItem.getSnapshotName()); + historyLatch.countDown(); + }), () -> { List snaps = Arrays.asList(snap1, snap2, snap3, snap4, snap5); logger.info("--> retrieving snapshots [{}]", snaps); - return Collections.singletonMap("repo", snaps); + return Collections.singletonMap(repoId, snaps); }, - (repo, snapshotInfo) -> { - logger.info("--> deleting {}", snapshotInfo.snapshotId()); + (repo, snapInfo) -> { + logger.info("--> deleting {}", snapInfo.snapshotId()); // Don't pause until snapshot 2 - if (snapshotInfo.snapshotId().equals(snap2.snapshotId())) { + if (snapInfo.snapshotId().equals(snap2.snapshotId())) { logger.info("--> pausing for 501ms while deleting snap2 to simulate deletion past a threshold"); nanos.addAndGet(TimeValue.timeValueMillis(501).nanos()); } - deleted.add(snapshotInfo.snapshotId()); - latch.countDown(); + deleted.add(snapInfo.snapshotId()); + deletionLatch.countDown(); + if (deletionSuccess) { + return Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), + snapInfo.snapshotId().getName(), policy.getId(), repo)); + } else { + try { + return Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), + snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed"))); + } catch (IOException e) { + logger.error(e); + fail("failed to serialize an exception to json, this should never happen"); + return Optional.empty(); // impossible to hit this but necessary to make the compiler happy + } + } }, nanos::get); long time = System.currentTimeMillis(); retentionTask.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time)); - boolean success = latch.await(10, TimeUnit.SECONDS); + boolean success = deletionLatch.await(10, TimeUnit.SECONDS); assertThat("expected 2 snapshot deletions within 10 seconds, deleted: " + deleted, success, equalTo(true)); @@ -238,6 +325,10 @@ public void testTimeBoundedDeletion() throws Exception { assertThat("two snapshots should have been deleted", deleted.size(), equalTo(2)); assertThat(deleted, containsInAnyOrder(snap1.snapshotId(), snap2.snapshotId())); + boolean historySuccess = historyLatch.await(10, TimeUnit.SECONDS); + assertThat("expected history entries for 2 snapshot deletions", historySuccess, equalTo(true)); + assertThat(deletedSnapshotsInHistory, containsInAnyOrder(snap1.snapshotId().getName(), snap2.snapshotId().getName())); + threadPool.shutdownNow(); threadPool.awaitTermination(10, TimeUnit.SECONDS); } @@ -263,43 +354,15 @@ public ClusterState createState(SnapshotLifecyclePolicy... policies) { } private static class MockSnapshotRetentionTask extends SnapshotRetentionTask { - private final Supplier>> snapshotRetriever; - private final Consumer>> snapshotDeleter; + private final BiFunction> deleteRunner; MockSnapshotRetentionTask(Client client, ClusterService clusterService, SnapshotHistoryStore historyStore, Supplier>> snapshotRetriever, - Consumer>> snapshotDeleter) { - super(client, clusterService, System::nanoTime, historyStore); - this.snapshotRetriever = snapshotRetriever; - this.snapshotDeleter = snapshotDeleter; - } - - @Override - void getAllSuccessfulSnapshots(Collection repositories, - ActionListener>> listener, - Consumer errorHandler) { - listener.onResponse(this.snapshotRetriever.get()); - } - - @Override - void deleteSnapshots(Map> snapshotsToDelete, TimeValue maxDeleteTime) { - this.snapshotDeleter.accept(snapshotsToDelete); - } - } - - private static class OverrideDeleteSnapshotRetentionTask extends SnapshotRetentionTask { - private final Supplier>> snapshotRetriever; - private final BiConsumer deleteRunner; - - OverrideDeleteSnapshotRetentionTask(Client client, - ClusterService clusterService, - SnapshotHistoryStore historyStore, - Supplier>> snapshotRetriever, - BiConsumer deleteRunner, - LongSupplier nanoSupplier) { + BiFunction> deleteRunner, + LongSupplier nanoSupplier) { super(client, clusterService, nanoSupplier, historyStore); this.snapshotRetriever = snapshotRetriever; this.deleteRunner = deleteRunner; @@ -313,8 +376,8 @@ void getAllSuccessfulSnapshots(Collection repositories, } @Override - void deleteSnapshot(String repo, SnapshotInfo snapshot) { - deleteRunner.accept(repo, snapshot); + Optional deleteSnapshot(String repo, SnapshotInfo snapshot) { + return deleteRunner.apply(repo, snapshot); } } } From b734a77c0bc031747f2728fc41f3176ed9bb35ca Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Tue, 13 Aug 2019 14:46:21 -0600 Subject: [PATCH 04/11] TODO note --- .../java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index 015b8b9c5b8e9..db4d31368fa80 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -258,7 +258,7 @@ void deleteSnapshots(Map> snapshotsToDelete, TimeValu Optional deleteSnapshot(String repo, SnapshotInfo info) { logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, info.snapshotId()); CountDownLatch latch = new CountDownLatch(1); - String policyId = (String) info.userMetadata().get(POLICY_ID_METADATA_FIELD); + String policyId = (String) info.userMetadata().get(POLICY_ID_METADATA_FIELD); // TODO: use getPolicyID once #45362 is merged AtomicReference result = new AtomicReference<>(); client.admin().cluster().prepareDeleteSnapshot(repo, info.snapshotId().getName()) .execute(new LatchedActionListener<>(new ActionListener<>() { From 14e4b6ed0deb16636cc67652f1af22f6c04075b8 Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Wed, 14 Aug 2019 17:53:06 -0600 Subject: [PATCH 05/11] Switch to a callback-based approach --- .../core/slm/history/SnapshotHistoryItem.java | 4 ++ .../xpack/slm/SnapshotRetentionTask.java | 57 ++++++++++++------- .../xpack/slm/SnapshotRetentionTaskTests.java | 30 +++++----- 3 files changed, 54 insertions(+), 37 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java index 6a3836d065c30..b262fbea49e46 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java @@ -116,6 +116,10 @@ public static SnapshotHistoryItem deletionSuccessRecord(long timestamp, String s return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, true, null, null); } + public static SnapshotHistoryItem deletionPossibleSuccessRecord(long timestamp, String snapshotName, String policyId, String repository, String details) { + return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, true, null, details); + } + public static SnapshotHistoryItem deletionFailureRecord(long timestamp, String snapshotName, String policyId, String repository, Exception exception) throws IOException { String exceptionString = exceptionToString(exception); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index 6a632d1a820b7..d2bdbee11010e 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -41,7 +41,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -256,21 +256,26 @@ void deleteSnapshots(Map> snapshotsToDelete, logger.info("starting snapshot retention deletion for [{}] snapshots", count); long startTime = nowNanoSupplier.getAsLong(); - int deleted = 0; - int failed = 0; + final AtomicInteger deleted = new AtomicInteger(0); + final AtomicInteger failed = new AtomicInteger(0); for (Map.Entry> entry : snapshotsToDelete.entrySet()) { String repo = entry.getKey(); List snapshots = entry.getValue(); for (SnapshotInfo info : snapshots) { - Optional result = deleteSnapshot(getPolicyId(info), repo, info, slmStats); - if (result.isPresent()) { - if (result.get().isSuccess()) { - deleted++; + deleteSnapshot(getPolicyId(info), repo, info, slmStats, historyItem -> { + // This would be nicer if we could use ifPresentOrElse + historyItem.ifPresent(item -> { + if (item.isSuccess()) { + deleted.incrementAndGet(); + } else { + failed.incrementAndGet(); + } + historyStore.putAsync(item); + }); + if (historyItem.isEmpty()) { + failed.incrementAndGet(); } - historyStore.putAsync(result.get()); - } else { - failed++; - } + }); // Check whether we have exceeded the maximum time allowed to spend deleting // snapshots, if we have, short-circuit the rest of the deletions TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime); @@ -292,24 +297,29 @@ void deleteSnapshots(Map> snapshotsToDelete, /** * Delete the given snapshot from the repository in blocking manner - * @param repo The repository the snapshot is in - * @param snapshot The snapshot metadata - * @return If present, a SnapshotHistoryItem containing the results of the deletion. Empty if no response or interrupted. + * + * @param repo The repository the snapshot is in + * @param snapshot The snapshot metadata + * @param onCompletion A callback taking info on the history of the snapshot. If present, a SnapshotHistoryItem containing the results + * of the deletion. Empty if interrupted or failed to serialize exception. */ - Optional deleteSnapshot(String slmPolicy, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats) { + void deleteSnapshot(String slmPolicy, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats, + Consumer> onCompletion) { logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, snapshot.snapshotId()); CountDownLatch latch = new CountDownLatch(1); - AtomicReference result = new AtomicReference<>(); client.admin().cluster().prepareDeleteSnapshot(repo, snapshot.snapshotId().getName()) .execute(new LatchedActionListener<>(new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { if (acknowledgedResponse.isAcknowledged()) { logger.debug("[{}] snapshot [{}] deleted successfully", repo, snapshot.snapshotId()); - result.set(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), - snapshot.snapshotId().getName(), slmPolicy, repo)); + onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), + snapshot.snapshotId().getName(), slmPolicy, repo))); } else { logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", repo, snapshot.snapshotId()); + onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionPossibleSuccessRecord(Instant.now().toEpochMilli(), + snapshot.snapshotId().getName(), slmPolicy, repo, + "deletion request issued successfully, no acknowledgement received"))); } slmStats.snapshotDeleted(slmPolicy); } @@ -318,16 +328,19 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) { public void onFailure(Exception e) { logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention", repo, snapshot.snapshotId()), e); + slmStats.snapshotDeleteFailure(slmPolicy); + SnapshotHistoryItem result; try { - result.set(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), - snapshot.snapshotId().getName(), slmPolicy, repo, e)); + result = SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), + snapshot.snapshotId().getName(), slmPolicy, repo, e); } catch (IOException ex) { // This shouldn't happen unless there's an issue with serializing the original exception logger.error(new ParameterizedMessage( "failed to record snapshot creation failure for snapshot lifecycle policy [{}]", slmPolicy), e); + result = null; } - slmStats.snapshotDeleteFailure(slmPolicy); + onCompletion.accept(Optional.ofNullable(result)); } }, latch)); try { @@ -337,9 +350,9 @@ public void onFailure(Exception e) { } catch (InterruptedException e) { logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted", repo, snapshot.snapshotId()), e); + onCompletion.accept(Optional.empty()); slmStats.snapshotDeleteFailure(slmPolicy); } - return Optional.ofNullable(result.get()); } void updateStateWithStats(SnapshotLifecycleStats newStats) { diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index 738a1192f086c..cf0297b09438a 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -190,21 +190,20 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception { logger.info("--> retrieving snapshots [{}]", snaps); return Collections.singletonMap(repoId, snaps); }, - (deletionPolicyId, repo, snapInfo, slmStats) -> { + (deletionPolicyId, repo, snapInfo, slmStats, onCompletion) -> { logger.info("--> deleting {} from repo {}", snapInfo, repo); deleted.add(snapInfo); deletionLatch.countDown(); if (deletionSuccess) { - return Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), - snapInfo.snapshotId().getName(), policy.getId(), repo)); + onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), + snapInfo.snapshotId().getName(), policy.getId(), repo))); } else { try { - return Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), - snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed"))); + onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), + snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed")))); } catch (IOException e) { logger.error(e); fail("failed to serialize an exception to json, this should never happen"); - return Optional.empty(); // impossible to hit this but necessary to make the compiler happy } } }, @@ -289,7 +288,7 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception logger.info("--> retrieving snapshots [{}]", snaps); return Collections.singletonMap(repoId, snaps); }, - (deletionPolicyId, repo, snapInfo, slmStats) -> { + (deletionPolicyId, repo, snapInfo, slmStats, onCompletion) -> { logger.info("--> deleting {}", snapInfo.snapshotId()); // Don't pause until snapshot 2 if (snapInfo.snapshotId().equals(snap2.snapshotId())) { @@ -299,16 +298,15 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception deleted.add(snapInfo.snapshotId()); deletionLatch.countDown(); if (deletionSuccess) { - return Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), - snapInfo.snapshotId().getName(), policy.getId(), repo)); + onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), + snapInfo.snapshotId().getName(), policy.getId(), repo))); } else { try { - return Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), - snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed"))); + onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), + snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed")))); } catch (IOException e) { logger.error(e); fail("failed to serialize an exception to json, this should never happen"); - return Optional.empty(); // impossible to hit this but necessary to make the compiler happy } } }, @@ -377,13 +375,15 @@ void getAllSuccessfulSnapshots(Collection repositories, } @Override - Optional deleteSnapshot(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats) { - return deleteRunner.apply(policyId, repo, snapshot, slmStats); + void deleteSnapshot(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats, + Consumer> onCompletion) { + deleteRunner.apply(policyId, repo, snapshot, slmStats, onCompletion); } } @FunctionalInterface interface DeleteSnapshotMock { - Optional apply(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats); + void apply(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats, + Consumer> onCompletion); } } From 8578d90cc0467964394c454c173c1512e7de824e Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Wed, 14 Aug 2019 18:14:15 -0600 Subject: [PATCH 06/11] Line length --- .../xpack/core/slm/history/SnapshotHistoryItem.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java index b262fbea49e46..380eaa8a65104 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java @@ -116,7 +116,8 @@ public static SnapshotHistoryItem deletionSuccessRecord(long timestamp, String s return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, true, null, null); } - public static SnapshotHistoryItem deletionPossibleSuccessRecord(long timestamp, String snapshotName, String policyId, String repository, String details) { + public static SnapshotHistoryItem deletionPossibleSuccessRecord(long timestamp, String snapshotName, String policyId, String repository, + String details) { return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, true, null, details); } From 94ef3d92b49dacec00e2b1e539d1cc350d7c5f46 Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Tue, 20 Aug 2019 18:04:44 -0600 Subject: [PATCH 07/11] Consumer->ActionListener --- .../xpack/slm/SnapshotRetentionTask.java | 69 +++++++++---------- .../xpack/slm/SnapshotRetentionTaskTests.java | 43 ++++-------- 2 files changed, 46 insertions(+), 66 deletions(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index d2bdbee11010e..9e166ca87f084 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -262,20 +262,32 @@ void deleteSnapshots(Map> snapshotsToDelete, String repo = entry.getKey(); List snapshots = entry.getValue(); for (SnapshotInfo info : snapshots) { - deleteSnapshot(getPolicyId(info), repo, info, slmStats, historyItem -> { - // This would be nicer if we could use ifPresentOrElse - historyItem.ifPresent(item -> { - if (item.isSuccess()) { - deleted.incrementAndGet(); - } else { - failed.incrementAndGet(); - } - historyStore.putAsync(item); - }); - if (historyItem.isEmpty()) { - failed.incrementAndGet(); + final String policyId = getPolicyId(info); + deleteSnapshot(policyId, repo, info, slmStats, ActionListener.wrap(acknowledgedResponse -> { + deleted.incrementAndGet(); + if (acknowledgedResponse.isAcknowledged()) { + historyStore.putAsync(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), + info.snapshotId().getName(), policyId, repo)); + } else { + SnapshotHistoryItem.deletionPossibleSuccessRecord(Instant.now().toEpochMilli(), + info.snapshotId().getName(), policyId, repo, + "deletion request issued successfully, no acknowledgement received"); + } + }, e -> { + failed.incrementAndGet(); + SnapshotHistoryItem result; + try { + result = SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), + info.snapshotId().getName(), policyId, repo, e); + } catch (IOException ex) { + // This shouldn't happen unless there's an issue with serializing the original exception + logger.error(new ParameterizedMessage( + "failed to record snapshot deletion failure for snapshot lifecycle policy [{}]", + policyId), ex); + return; } - }); + historyStore.putAsync(result); + })); // Check whether we have exceeded the maximum time allowed to spend deleting // snapshots, if we have, short-circuit the rest of the deletions TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime); @@ -298,13 +310,13 @@ void deleteSnapshots(Map> snapshotsToDelete, /** * Delete the given snapshot from the repository in blocking manner * - * @param repo The repository the snapshot is in - * @param snapshot The snapshot metadata - * @param onCompletion A callback taking info on the history of the snapshot. If present, a SnapshotHistoryItem containing the results - * of the deletion. Empty if interrupted or failed to serialize exception. + * @param repo The repository the snapshot is in + * @param snapshot The snapshot metadata + * @param listener {@link ActionListener#onResponse(Object)} is called if a {@link SnapshotHistoryItem} can be created representing a + * successful or failed deletion call. {@link ActionListener#onFailure(Exception)} is called only if interrupted. */ void deleteSnapshot(String slmPolicy, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats, - Consumer> onCompletion) { + ActionListener listener) { logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, snapshot.snapshotId()); CountDownLatch latch = new CountDownLatch(1); client.admin().cluster().prepareDeleteSnapshot(repo, snapshot.snapshotId().getName()) @@ -313,14 +325,10 @@ void deleteSnapshot(String slmPolicy, String repo, SnapshotInfo snapshot, Snapsh public void onResponse(AcknowledgedResponse acknowledgedResponse) { if (acknowledgedResponse.isAcknowledged()) { logger.debug("[{}] snapshot [{}] deleted successfully", repo, snapshot.snapshotId()); - onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), - snapshot.snapshotId().getName(), slmPolicy, repo))); } else { logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", repo, snapshot.snapshotId()); - onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionPossibleSuccessRecord(Instant.now().toEpochMilli(), - snapshot.snapshotId().getName(), slmPolicy, repo, - "deletion request issued successfully, no acknowledgement received"))); } + listener.onResponse(acknowledgedResponse); slmStats.snapshotDeleted(slmPolicy); } @@ -329,18 +337,7 @@ public void onFailure(Exception e) { logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention", repo, snapshot.snapshotId()), e); slmStats.snapshotDeleteFailure(slmPolicy); - SnapshotHistoryItem result; - try { - result = SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), - snapshot.snapshotId().getName(), slmPolicy, repo, e); - } catch (IOException ex) { - // This shouldn't happen unless there's an issue with serializing the original exception - logger.error(new ParameterizedMessage( - "failed to record snapshot creation failure for snapshot lifecycle policy [{}]", - slmPolicy), e); - result = null; - } - onCompletion.accept(Optional.ofNullable(result)); + listener.onFailure(e); } }, latch)); try { @@ -350,7 +347,7 @@ public void onFailure(Exception e) { } catch (InterruptedException e) { logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted", repo, snapshot.snapshotId()), e); - onCompletion.accept(Optional.empty()); + listener.onFailure(e); slmStats.snapshotDeleteFailure(slmPolicy); } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index cf0297b09438a..79a5065451e59 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.slm; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -28,11 +29,8 @@ import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata; import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration; -import org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem; import org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore; -import java.io.IOException; -import java.time.Instant; import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; @@ -40,7 +38,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -190,22 +187,15 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception { logger.info("--> retrieving snapshots [{}]", snaps); return Collections.singletonMap(repoId, snaps); }, - (deletionPolicyId, repo, snapInfo, slmStats, onCompletion) -> { + (deletionPolicyId, repo, snapInfo, slmStats, listener) -> { logger.info("--> deleting {} from repo {}", snapInfo, repo); deleted.add(snapInfo); - deletionLatch.countDown(); if (deletionSuccess) { - onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), - snapInfo.snapshotId().getName(), policy.getId(), repo))); + listener.onResponse(new AcknowledgedResponse(true)); } else { - try { - onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), - snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed")))); - } catch (IOException e) { - logger.error(e); - fail("failed to serialize an exception to json, this should never happen"); - } + listener.onFailure(new RuntimeException("deletion_failed")); } + deletionLatch.countDown(); }, System::nanoTime); @@ -243,7 +233,7 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception final String policyId = "policy"; final String repoId = "repo"; SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyId, "snap", "1 * * * * ?", - repoId, null, new SnapshotRetentionConfiguration(null, null,1)); + repoId, null, new SnapshotRetentionConfiguration(null, null, 1)); ClusterState state = createState(policy); state = ClusterState.builder(state) @@ -288,7 +278,7 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception logger.info("--> retrieving snapshots [{}]", snaps); return Collections.singletonMap(repoId, snaps); }, - (deletionPolicyId, repo, snapInfo, slmStats, onCompletion) -> { + (deletionPolicyId, repo, snapInfo, slmStats, listener) -> { logger.info("--> deleting {}", snapInfo.snapshotId()); // Don't pause until snapshot 2 if (snapInfo.snapshotId().equals(snap2.snapshotId())) { @@ -296,19 +286,12 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception nanos.addAndGet(TimeValue.timeValueMillis(501).nanos()); } deleted.add(snapInfo.snapshotId()); - deletionLatch.countDown(); if (deletionSuccess) { - onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), - snapInfo.snapshotId().getName(), policy.getId(), repo))); + listener.onResponse(new AcknowledgedResponse(true)); } else { - try { - onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), - snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed")))); - } catch (IOException e) { - logger.error(e); - fail("failed to serialize an exception to json, this should never happen"); - } + listener.onFailure(new RuntimeException("deletion_failed")); } + deletionLatch.countDown(); }, nanos::get); @@ -376,14 +359,14 @@ void getAllSuccessfulSnapshots(Collection repositories, @Override void deleteSnapshot(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats, - Consumer> onCompletion) { - deleteRunner.apply(policyId, repo, snapshot, slmStats, onCompletion); + ActionListener listener) { + deleteRunner.apply(policyId, repo, snapshot, slmStats, listener); } } @FunctionalInterface interface DeleteSnapshotMock { void apply(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats, - Consumer> onCompletion); + ActionListener listener); } } From 917ac4eb605f9b37cb32fcd4b4ae56b0180a79ed Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Tue, 20 Aug 2019 18:05:26 -0600 Subject: [PATCH 08/11] Add `final` --- .../java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index 9e166ca87f084..8e54a39ae4ec8 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -275,7 +275,7 @@ void deleteSnapshots(Map> snapshotsToDelete, } }, e -> { failed.incrementAndGet(); - SnapshotHistoryItem result; + final SnapshotHistoryItem result; try { result = SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), info.snapshotId().getName(), policyId, repo, e); From 65bebe455f987906ec85866af9004ccf1a6ce2f7 Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Wed, 21 Aug 2019 12:34:29 -0600 Subject: [PATCH 09/11] SnapshotInfo-SnapshotId in deleteSnapshot --- .../xpack/slm/SnapshotRetentionTask.java | 17 +++++++------- .../xpack/slm/SnapshotRetentionTaskTests.java | 22 +++++++++---------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index 8e54a39ae4ec8..f0aeb6b55c885 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -19,6 +19,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.xpack.core.ClientHelper; @@ -263,7 +264,7 @@ void deleteSnapshots(Map> snapshotsToDelete, List snapshots = entry.getValue(); for (SnapshotInfo info : snapshots) { final String policyId = getPolicyId(info); - deleteSnapshot(policyId, repo, info, slmStats, ActionListener.wrap(acknowledgedResponse -> { + deleteSnapshot(policyId, repo, info.snapshotId(), slmStats, ActionListener.wrap(acknowledgedResponse -> { deleted.incrementAndGet(); if (acknowledgedResponse.isAcknowledged()) { historyStore.putAsync(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(), @@ -315,18 +316,18 @@ void deleteSnapshots(Map> snapshotsToDelete, * @param listener {@link ActionListener#onResponse(Object)} is called if a {@link SnapshotHistoryItem} can be created representing a * successful or failed deletion call. {@link ActionListener#onFailure(Exception)} is called only if interrupted. */ - void deleteSnapshot(String slmPolicy, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats, + void deleteSnapshot(String slmPolicy, String repo, SnapshotId snapshot, SnapshotLifecycleStats slmStats, ActionListener listener) { - logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, snapshot.snapshotId()); + logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, snapshot); CountDownLatch latch = new CountDownLatch(1); - client.admin().cluster().prepareDeleteSnapshot(repo, snapshot.snapshotId().getName()) + client.admin().cluster().prepareDeleteSnapshot(repo, snapshot.getName()) .execute(new LatchedActionListener<>(new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { if (acknowledgedResponse.isAcknowledged()) { - logger.debug("[{}] snapshot [{}] deleted successfully", repo, snapshot.snapshotId()); + logger.debug("[{}] snapshot [{}] deleted successfully", repo, snapshot); } else { - logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", repo, snapshot.snapshotId()); + logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", repo, snapshot); } listener.onResponse(acknowledgedResponse); slmStats.snapshotDeleted(slmPolicy); @@ -335,7 +336,7 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) { @Override public void onFailure(Exception e) { logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention", - repo, snapshot.snapshotId()), e); + repo, snapshot), e); slmStats.snapshotDeleteFailure(slmPolicy); listener.onFailure(e); } @@ -346,7 +347,7 @@ public void onFailure(Exception e) { latch.await(); } catch (InterruptedException e) { logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted", - repo, snapshot.snapshotId()), e); + repo, snapshot), e); listener.onFailure(e); slmStats.snapshotDeleteFailure(slmPolicy); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index 79a5065451e59..f3be1b39f486a 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -162,7 +162,7 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception { System.currentTimeMillis(), null, System.currentTimeMillis() + 1, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); - Set deleted = ConcurrentHashMap.newKeySet(); + Set deleted = ConcurrentHashMap.newKeySet(); Set deletedSnapshotsInHistory = ConcurrentHashMap.newKeySet(); CountDownLatch deletionLatch = new CountDownLatch(1); CountDownLatch historyLatch = new CountDownLatch(1); @@ -187,9 +187,9 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception { logger.info("--> retrieving snapshots [{}]", snaps); return Collections.singletonMap(repoId, snaps); }, - (deletionPolicyId, repo, snapInfo, slmStats, listener) -> { - logger.info("--> deleting {} from repo {}", snapInfo, repo); - deleted.add(snapInfo); + (deletionPolicyId, repo, snapId, slmStats, listener) -> { + logger.info("--> deleting {} from repo {}", snapId, repo); + deleted.add(snapId); if (deletionSuccess) { listener.onResponse(new AcknowledgedResponse(true)); } else { @@ -206,7 +206,7 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception { assertThat("something should have been deleted", deleted, not(empty())); assertThat("one snapshot should have been deleted", deleted, hasSize(1)); - assertThat(deleted, contains(eligibleSnapshot)); + assertThat(deleted, contains(eligibleSnapshot.snapshotId())); boolean historySuccess = historyLatch.await(10, TimeUnit.SECONDS); assertThat("expected history entries for 1 snapshot deletions", historySuccess, equalTo(true)); @@ -278,14 +278,14 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception logger.info("--> retrieving snapshots [{}]", snaps); return Collections.singletonMap(repoId, snaps); }, - (deletionPolicyId, repo, snapInfo, slmStats, listener) -> { - logger.info("--> deleting {}", snapInfo.snapshotId()); + (deletionPolicyId, repo, snapId, slmStats, listener) -> { + logger.info("--> deleting {}", snapId); // Don't pause until snapshot 2 - if (snapInfo.snapshotId().equals(snap2.snapshotId())) { + if (snapId.equals(snap2.snapshotId())) { logger.info("--> pausing for 501ms while deleting snap2 to simulate deletion past a threshold"); nanos.addAndGet(TimeValue.timeValueMillis(501).nanos()); } - deleted.add(snapInfo.snapshotId()); + deleted.add(snapId); if (deletionSuccess) { listener.onResponse(new AcknowledgedResponse(true)); } else { @@ -358,7 +358,7 @@ void getAllSuccessfulSnapshots(Collection repositories, } @Override - void deleteSnapshot(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats, + void deleteSnapshot(String policyId, String repo, SnapshotId snapshot, SnapshotLifecycleStats slmStats, ActionListener listener) { deleteRunner.apply(policyId, repo, snapshot, slmStats, listener); } @@ -366,7 +366,7 @@ void deleteSnapshot(String policyId, String repo, SnapshotInfo snapshot, Snapsho @FunctionalInterface interface DeleteSnapshotMock { - void apply(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats, + void apply(String policyId, String repo, SnapshotId snapshot, SnapshotLifecycleStats slmStats, ActionListener listener); } } From dc4e46a0d56bdd0a50a0d2882c34a2cefb11fd1f Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Wed, 21 Aug 2019 12:35:23 -0600 Subject: [PATCH 10/11] Move putAsync inside try --- .../org/elasticsearch/xpack/slm/SnapshotRetentionTask.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index f0aeb6b55c885..fa292e14fc8f8 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -276,18 +276,16 @@ void deleteSnapshots(Map> snapshotsToDelete, } }, e -> { failed.incrementAndGet(); - final SnapshotHistoryItem result; try { - result = SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), + final SnapshotHistoryItem result = SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(), info.snapshotId().getName(), policyId, repo, e); + historyStore.putAsync(result); } catch (IOException ex) { // This shouldn't happen unless there's an issue with serializing the original exception logger.error(new ParameterizedMessage( "failed to record snapshot deletion failure for snapshot lifecycle policy [{}]", policyId), ex); - return; } - historyStore.putAsync(result); })); // Check whether we have exceeded the maximum time allowed to spend deleting // snapshots, if we have, short-circuit the rest of the deletions From e1e22b88a76583f99b530f6cfe153a8798d74d84 Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Wed, 21 Aug 2019 13:41:30 -0600 Subject: [PATCH 11/11] Fix typo in test name --- .../elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index f3be1b39f486a..86e7a34214bbb 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -135,11 +135,11 @@ public void testSnapshotEligibleForDeletion() { assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); } - public void testRentionTaskSuccess() throws Exception { + public void testRetentionTaskSuccess() throws Exception { retentionTaskTest(true); } - public void testRentionTaskFailure() throws Exception { + public void testRetentionTaskFailure() throws Exception { retentionTaskTest(false); }