From 45278241336652dd23b452270146b5eeaec396dc Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 25 Sep 2019 14:24:54 -0600 Subject: [PATCH] Wait for snapshot completion in SLM snapshot invocation (#47051) * Wait for snapshot completion in SLM snapshot invocation This changes the snapshots internally invoked by SLM to wait for completion. This allows us to capture more snapshotting failure scenarios. For example, previously a snapshot would be created and then registered as a "success", however, the snapshot may have been aborted, or it may have had a subset of its shards fail. These cases are now handled by inspecting the response to the `CreateSnapshotRequest` and ensuring that there are no failures. If any failures are present, the history store now stores the action as a failure instead of a success. Relates to #38461 and #43663 --- .../create/CreateSnapshotResponse.java | 2 +- .../core/slm/SnapshotLifecyclePolicy.java | 2 +- .../xpack/ilm/IndexLifecycle.java | 3 +- .../xpack/slm/SnapshotLifecycleTask.java | 30 +++++-- .../xpack/slm/SnapshotRetentionTask.java | 15 ++-- .../xpack/slm/SnapshotLifecycleTaskTests.java | 82 +++++++++++++++++++ 6 files changed, 119 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponse.java index a63f2cf810475..b16959e9d18de 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponse.java @@ -53,7 +53,7 @@ public class CreateSnapshotResponse extends ActionResponse implements ToXContent CreateSnapshotResponse() {} - CreateSnapshotResponse(@Nullable SnapshotInfo snapshotInfo) { + public CreateSnapshotResponse(@Nullable SnapshotInfo snapshotInfo) { this.snapshotInfo = snapshotInfo; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/SnapshotLifecyclePolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/SnapshotLifecyclePolicy.java index e038d3bb6e3bf..0ef7912c58017 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/SnapshotLifecyclePolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/SnapshotLifecyclePolicy.java @@ -260,7 +260,7 @@ public CreateSnapshotRequest toRequest() { Map mergedConfiguration = new HashMap<>(configuration); mergedConfiguration.put("metadata", metadataWithAddedPolicyName); req.source(mergedConfiguration); - req.waitForCompletion(false); + req.waitForCompletion(true); return req; } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index 74501b5965758..39e27f2a5afe5 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -146,7 +146,8 @@ public List> getSettings() { LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING, RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING, LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING, - LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING); + LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING, + LifecycleSettings.SLM_RETENTION_DURATION_SETTING); } @Override diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java index 993dac40252e6..4c64437946aea 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java @@ -23,6 +23,8 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.snapshots.SnapshotException; +import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.slm.SnapshotInvocationRecord; @@ -91,16 +93,32 @@ public static Optional maybeTakeSnapshot(final String jobId, final Clien public void onResponse(CreateSnapshotResponse createSnapshotResponse) { logger.debug("snapshot response for [{}]: {}", policyMetadata.getPolicy().getId(), Strings.toString(createSnapshotResponse)); - final long timestamp = Instant.now().toEpochMilli(); - clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(), - WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp)); - historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(), - request.snapshot())); + final SnapshotInfo snapInfo = createSnapshotResponse.getSnapshotInfo(); + + // Check that there are no failed shards, since the request may not entirely + // fail, but may still have failures (such as in the case of an aborted snapshot) + if (snapInfo.failedShards() == 0) { + final long timestamp = Instant.now().toEpochMilli(); + clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(), + WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp)); + historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(), + request.snapshot())); + } else { + int failures = snapInfo.failedShards(); + int total = snapInfo.totalShards(); + final SnapshotException e = new SnapshotException(request.repository(), request.snapshot(), + "failed to create snapshot successfully, " + failures + " out of " + total + " total shards failed"); + // Add each failed shard's exception as suppressed, the exception contains + // information about which shard failed + snapInfo.shardFailures().forEach(failure -> e.addSuppressed(failure.getCause())); + // Call the failure handler to register this as a failure and persist it + onFailure(e); + } } @Override public void onFailure(Exception e) { - logger.error("failed to issue create snapshot request for snapshot lifecycle policy [{}]: {}", + logger.error("failed to create snapshot for snapshot lifecycle policy [{}]: {}", policyMetadata.getPolicy().getId(), e); final long timestamp = Instant.now().toEpochMilli(); clusterService.submitStateUpdateTask("slm-record-failure-" + policyMetadata.getPolicy().getId(), diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index 27c40fbe15ca9..d19707a2d8bda 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -324,6 +324,7 @@ void deleteSnapshots(Map> snapshotsToDelete, List snapshots = entry.getValue(); for (SnapshotInfo info : snapshots) { final String policyId = getPolicyId(info); + final long deleteStartTime = nowNanoSupplier.getAsLong(); deleteSnapshot(policyId, repo, info.snapshotId(), slmStats, ActionListener.wrap(acknowledgedResponse -> { deleted.incrementAndGet(); if (acknowledgedResponse.isAcknowledged()) { @@ -349,13 +350,15 @@ void deleteSnapshots(Map> snapshotsToDelete, })); // Check whether we have exceeded the maximum time allowed to spend deleting // snapshots, if we have, short-circuit the rest of the deletions - TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime); - logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), elapsedDeletionTime); - if (elapsedDeletionTime.compareTo(maximumTime) > 0) { + long finishTime = nowNanoSupplier.getAsLong(); + TimeValue deletionTime = TimeValue.timeValueNanos(finishTime - deleteStartTime); + logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), deletionTime); + TimeValue totalDeletionTime = TimeValue.timeValueNanos(finishTime - startTime); + if (totalDeletionTime.compareTo(maximumTime) > 0) { logger.info("maximum snapshot retention deletion time reached, time spent: [{}]," + " maximum allowed time: [{}], deleted [{}] out of [{}] snapshots scheduled for deletion, failed to delete [{}]", - elapsedDeletionTime, maximumTime, deleted, count, failed); - slmStats.deletionTime(elapsedDeletionTime); + totalDeletionTime, maximumTime, deleted, count, failed); + slmStats.deletionTime(totalDeletionTime); slmStats.retentionTimedOut(); return; } @@ -387,8 +390,8 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) { } else { logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", repo, snapshot); } - listener.onResponse(acknowledgedResponse); slmStats.snapshotDeleted(slmPolicy); + listener.onResponse(acknowledgedResponse); } @Override diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java index 84c1d12cce65e..5474602cdfda6 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java @@ -23,6 +23,10 @@ import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; @@ -47,6 +51,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; @@ -196,6 +201,83 @@ public void testCreateSnapshotOnTrigger() { threadPool.shutdownNow(); } + public void testPartialFailureSnapshot() throws Exception { + final String id = randomAlphaOfLength(4); + final SnapshotLifecyclePolicyMetadata slpm = makePolicyMeta(id); + final SnapshotLifecycleMetadata meta = + new SnapshotLifecycleMetadata(Collections.singletonMap(id, slpm), OperationMode.RUNNING, new SnapshotLifecycleStats()); + + final ClusterState state = ClusterState.builder(new ClusterName("test")) + .metaData(MetaData.builder() + .putCustom(SnapshotLifecycleMetadata.TYPE, meta) + .build()) + .build(); + + final ThreadPool threadPool = new TestThreadPool("test"); + final AtomicBoolean clientCalled = new AtomicBoolean(false); + final SetOnce snapshotName = new SetOnce<>(); + try (ClusterService clusterService = ClusterServiceUtils.createClusterService(state, threadPool); + VerifyingClient client = new VerifyingClient(threadPool, + (action, request, listener) -> { + assertFalse(clientCalled.getAndSet(true)); + assertThat(action, instanceOf(CreateSnapshotAction.class)); + assertThat(request, instanceOf(CreateSnapshotRequest.class)); + + CreateSnapshotRequest req = (CreateSnapshotRequest) request; + + SnapshotLifecyclePolicy policy = slpm.getPolicy(); + assertThat(req.snapshot(), startsWith(policy.getName() + "-")); + assertThat(req.repository(), equalTo(policy.getRepository())); + snapshotName.set(req.snapshot()); + if (req.indices().length > 0) { + assertThat(Arrays.asList(req.indices()), equalTo(policy.getConfig().get("indices"))); + } + boolean globalState = policy.getConfig().get("include_global_state") == null || + Boolean.parseBoolean((String) policy.getConfig().get("include_global_state")); + assertThat(req.includeGlobalState(), equalTo(globalState)); + + return new CreateSnapshotResponse( + new SnapshotInfo( + new SnapshotId(req.snapshot(), "uuid"), + Arrays.asList(req.indices()), + randomNonNegativeLong(), + "snapshot started", + randomNonNegativeLong(), + 3, + Collections.singletonList( + new SnapshotShardFailure("nodeId", new ShardId("index", "uuid", 0), "forced failure")), + req.includeGlobalState(), + req.userMetadata() + )); + })) { + final AtomicBoolean historyStoreCalled = new AtomicBoolean(false); + SnapshotHistoryStore historyStore = new VerifyingHistoryStore(null, ZoneOffset.UTC, + item -> { + assertFalse(historyStoreCalled.getAndSet(true)); + final SnapshotLifecyclePolicy policy = slpm.getPolicy(); + assertEquals(policy.getId(), item.getPolicyId()); + assertEquals(policy.getRepository(), item.getRepository()); + assertEquals(policy.getConfig(), item.getSnapshotConfiguration()); + assertEquals(snapshotName.get(), item.getSnapshotName()); + assertFalse("item should be a failure", item.isSuccess()); + assertThat(item.getErrorDetails(), + containsString("failed to create snapshot successfully, 1 out of 3 total shards failed")); + assertThat(item.getErrorDetails(), + containsString("forced failure")); + }); + + SnapshotLifecycleTask task = new SnapshotLifecycleTask(client, clusterService, historyStore); + // Trigger the event with a matching job name for the policy + task.triggered(new SchedulerEngine.Event(SnapshotLifecycleService.getJobId(slpm), + System.currentTimeMillis(), System.currentTimeMillis())); + + assertTrue("snapshot should be triggered once", clientCalled.get()); + assertTrue("history store should be called once", historyStoreCalled.get()); + } + + threadPool.shutdownNow(); + } + /** * A client that delegates to a verifying function for action/request/listener */