From 51100e2ac697296e64875e1637ed8826c491d7d6 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 16 Aug 2019 14:55:25 +0200 Subject: [PATCH 1/7] Make Snapshot Logic Write Metadata after Segments * WIP, this still needs a docs fixup * Fixes #41581 * Fixes #25281 --- .../repositories/FilterRepository.java | 9 +- .../repositories/Repository.java | 13 +- .../blobstore/BlobStoreRepository.java | 52 +++--- .../blobstore/ChecksumBlobStoreFormat.java | 11 +- .../repositories/blobstore/package-info.java | 10 +- .../snapshots/SnapshotsService.java | 46 +++-- .../RepositoriesServiceTests.java | 7 +- .../snapshots/BlobStoreFormatIT.java | 10 +- .../DedicatedClusterSnapshotRestoreIT.java | 1 - .../SharedClusterSnapshotRestoreIT.java | 171 +++--------------- .../snapshots/SnapshotResiliencyTests.java | 81 +++++++++ .../MockEventuallyConsistentRepository.java | 6 +- ...ckEventuallyConsistentRepositoryTests.java | 7 +- .../snapshots/mockstore/MockRepository.java | 20 +- .../index/shard/RestoreOnlyRepository.java | 6 +- .../xpack/ccr/repository/CcrRepository.java | 7 +- .../SourceOnlySnapshotRepository.java | 8 +- .../SourceOnlySnapshotShardTests.java | 5 - 18 files changed, 198 insertions(+), 272 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 8c9eff0698835..28e36a7ae9bd8 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -73,17 +73,12 @@ public RepositoryData getRepositoryData() { return in.getRepositoryData(); } - @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { - in.initializeSnapshot(snapshotId, indices, metaData); - } - @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, - Map userMetadata) { + MetaData metaData, Map userMetadata) { return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState, userMetadata); + includeGlobalState, metaData, userMetadata); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 788459b16c540..6a67a33d6af18 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -48,8 +48,6 @@ *

* To perform a snapshot: *

    - *
  • Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)} - * with list of indices that will be included into the snapshot
  • *
  • Data nodes call {@link Repository#snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)} * for each shard
  • *
  • When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures
  • @@ -110,15 +108,6 @@ default Repository create(RepositoryMetaData metaData, Function indices, MetaData metaData); - /** * Finalizes snapshotting process *

    @@ -136,7 +125,7 @@ default Repository create(RepositoryMetaData metaData, Function indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, - Map userMetadata); + MetaData clusterMetaData, Map userMetadata); /** * Deletes snapshot diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 947fdd4dfa952..bab6e4b924ba6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -84,7 +84,6 @@ import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryVerificationException; -import org.elasticsearch.snapshots.InvalidSnapshotNameException; import org.elasticsearch.snapshots.SnapshotCreationException; import org.elasticsearch.snapshots.SnapshotException; import org.elasticsearch.snapshots.SnapshotId; @@ -139,7 +138,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final String TESTS_FILE = "tests-"; - private static final String METADATA_PREFIX = "meta-"; + public static final String METADATA_PREFIX = "meta-"; public static final String METADATA_NAME_FORMAT = METADATA_PREFIX + "%s.dat"; @@ -353,31 +352,6 @@ public RepositoryMetaData getMetadata() { return metadata; } - @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetaData) { - if (isReadOnly()) { - throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository"); - } - try { - final String snapshotName = snapshotId.getName(); - // check if the snapshot name already exists in the repository - final RepositoryData repositoryData = getRepositoryData(); - if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { - throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists"); - } - - // Write Global MetaData - globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID()); - - // write the index metadata for each index in the snapshot - for (IndexId index : indices) { - indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID()); - } - } catch (IOException ex) { - throw new SnapshotCreationException(metadata.name(), snapshotId, ex); - } - } - @Override public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { if (isReadOnly()) { @@ -558,14 +532,34 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, final List shardFailures, final long repositoryStateId, final boolean includeGlobalState, + final MetaData clusterMetaData, final Map userMetadata) { SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId, indices.stream().map(IndexId::getName).collect(Collectors.toList()), startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, includeGlobalState, userMetadata); + + try { + // We ignore all FileAlreadyExistsException here since otherwise a master failover while in this method will + // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the + // index or global metadata will be compatible with the segments written in this snapshot as well. + // Failing on an already existing snap-${uuid}.dat below ensures that the index.latest blob is not updated in a way that + // decrements the generation it points at + + // Write Global MetaData + globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false); + + // write the index metadata for each index in the snapshot + for (IndexId index : indices) { + indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false); + } + } catch (IOException ex) { + throw new SnapshotCreationException(metadata.name(), snapshotId, ex); + } + try { final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices); - snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID()); + snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID(), false); writeIndexGen(updatedRepositoryData, repositoryStateId); } catch (FileAlreadyExistsException ex) { // if another master was elected and took over finalizing the snapshot, it is possible @@ -941,7 +935,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); try { - indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID()); + indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID(), true); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index bd2b4900ece71..4d8a25ca8d50d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -175,15 +175,16 @@ public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws *

    * The blob will be compressed and checksum will be written if required. * - * @param obj object to be serialized - * @param blobContainer blob container - * @param name blob name + * @param obj object to be serialized + * @param blobContainer blob container + * @param name blob name + * @param failIfAlreadyExists Whether to fail if the blob already exists */ - public void write(T obj, BlobContainer blobContainer, String name) throws IOException { + public void write(T obj, BlobContainer blobContainer, String name, boolean failIfAlreadyExists) throws IOException { final String blobName = blobName(name); writeTo(obj, blobName, bytesArray -> { try (InputStream stream = bytesArray.streamInput()) { - blobContainer.writeBlob(blobName, stream, bytesArray.length(), true); + blobContainer.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists); } }); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java index 0a66df3cf8521..7069ae193ca13 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java @@ -120,15 +120,7 @@ * *

    Initializing a Snapshot in the Repository

    * - *

    Creating a snapshot in the repository starts with a call to {@link org.elasticsearch.repositories.Repository#initializeSnapshot} which - * the blob store repository implements via the following actions:

    - *
      - *
    1. Verify that no snapshot by the requested name exists.
    2. - *
    3. Write a blob containing the cluster metadata to the root of the blob store repository at {@code /meta-${snapshot-uuid}.dat}
    4. - *
    5. Write the metadata for each index to a blob in that index's directory at - * {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat}
    6. - *
    - * TODO: This behavior is problematic, adjust these docs once https://github.com/elastic/elasticsearch/issues/41581 is fixed + * TODO: Adjust these docs for https://github.com/elastic/elasticsearch/issues/41581 * *

    Writing Shard Data (Segments)

    * diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 85497ad60c6e6..a39edac42a0ff 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -67,6 +67,7 @@ import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.threadpool.ThreadPool; @@ -99,7 +100,7 @@ *
  • On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that * no snapshot is currently running and registers the new snapshot in cluster state
  • *
  • When cluster state is updated - * the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes + * the {@link #beginSnapshot(SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes * the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state
  • *
  • Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(SnapshotsInProgress)} method
  • @@ -303,7 +304,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl if (newSnapshot != null) { final Snapshot current = newSnapshot.snapshot(); assert initializingSnapshots.contains(current); - beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener<>() { + beginSnapshot(newSnapshot, request.partial(), new ActionListener<>() { @Override public void onResponse(final Snapshot snapshot) { initializingSnapshots.remove(snapshot); @@ -372,14 +373,11 @@ private static void validate(final String repositoryName, final String snapshotN *

    * Creates snapshot in repository and updates snapshot metadata record with list of shards that needs to be processed. * - * @param clusterState cluster state * @param snapshot snapshot meta data * @param partial allow partial snapshots * @param userCreateSnapshotListener listener */ - private void beginSnapshot(final ClusterState clusterState, - final SnapshotsInProgress.Entry snapshot, - final boolean partial, + private void beginSnapshot(final SnapshotsInProgress.Entry snapshot, final boolean partial, final ActionListener userCreateSnapshotListener) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { @@ -392,17 +390,16 @@ protected void doRun() { assert initializingSnapshots.contains(snapshot.snapshot()); Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository()); - MetaData metaData = clusterState.metaData(); - if (!snapshot.includeGlobalState()) { - // Remove global state from the cluster state - MetaData.Builder builder = MetaData.builder(); - for (IndexId index : snapshot.indices()) { - builder.put(metaData.index(index.getName()), false); - } - metaData = builder.build(); + final String snapshotName = snapshot.snapshot().getSnapshotId().getName(); + if (repository.isReadOnly()) { + throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"); + } + // check if the snapshot name already exists in the repository + final RepositoryData repositoryData = repository.getRepositoryData(); + if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { + throw new InvalidSnapshotNameException( + repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists"); } - - repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData); snapshotCreated = true; logger.info("snapshot [{}] started", snapshot.snapshot()); @@ -541,6 +538,9 @@ public void onNoLongerMaster() { private void cleanupAfterError(Exception exception) { if(snapshotCreated) { try { + // TODO: Get the metadata from cluster state update to be safe? + final MetaData metaData = metaDataForSnapshot(snapshot); + repositoriesService.repository(snapshot.snapshot().getRepository()) .finalizeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), @@ -550,6 +550,7 @@ private void cleanupAfterError(Exception exception) { Collections.emptyList(), snapshot.getRepositoryStateId(), snapshot.includeGlobalState(), + metaData, snapshot.userMetadata()); } catch (Exception inner) { inner.addSuppressed(exception); @@ -559,7 +560,19 @@ private void cleanupAfterError(Exception exception) { } userCreateSnapshotListener.onFailure(e); } + } + private MetaData metaDataForSnapshot(final SnapshotsInProgress.Entry snapshot) { + MetaData metaData = clusterService.state().metaData(); + if (!snapshot.includeGlobalState()) { + // Remove global state from the cluster state + MetaData.Builder builder = MetaData.builder(); + for (IndexId index : snapshot.indices()) { + builder.put(metaData.index(index.getName()), false); + } + metaData = builder.build(); + } + return metaData; } private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) { @@ -982,6 +995,7 @@ protected void doRun() { unmodifiableList(shardFailures), entry.getRepositoryStateId(), entry.includeGlobalState(), + metaDataForSnapshot(entry), entry.userMetadata()); removeSnapshotFromClusterState(snapshot, snapshotInfo, null); logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index d79763a9f6eab..37c3f2940a64e 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -153,15 +153,10 @@ public RepositoryData getRepositoryData() { return null; } - @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { - - } - @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, - boolean includeGlobalState, Map userMetadata) { + boolean includeGlobalState, MetaData metaData, Map userMetadata) { return null; } diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java index 882b3cc4b1e86..c00760899c4d2 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java @@ -116,8 +116,8 @@ public void testBlobStoreOperations() throws IOException { xContentRegistry(), true); // Write blobs in different formats - checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile"); - checksumSMILECompressed.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp"); + checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", true); + checksumSMILECompressed.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", true); // Assert that all checksum blobs can be read by all formats assertEquals(checksumSMILE.read(blobContainer, "check-smile").getText(), "checksum smile"); @@ -136,8 +136,8 @@ public void testCompressionIsApplied() throws IOException { ChecksumBlobStoreFormat checksumFormatComp = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent, xContentRegistry(), true); BlobObj blobObj = new BlobObj(veryRedundantText.toString()); - checksumFormatComp.write(blobObj, blobContainer, "blob-comp"); - checksumFormat.write(blobObj, blobContainer, "blob-not-comp"); + checksumFormatComp.write(blobObj, blobContainer, "blob-comp", true); + checksumFormat.write(blobObj, blobContainer, "blob-not-comp", true); Map blobs = blobContainer.listBlobsByPrefix("blob-"); assertEquals(blobs.size(), 2); assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length())); @@ -150,7 +150,7 @@ public void testBlobCorruption() throws IOException { BlobObj blobObj = new BlobObj(testString); ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent, xContentRegistry(), randomBoolean()); - checksumFormat.write(blobObj, blobContainer, "test-path"); + checksumFormat.write(blobObj, blobContainer, "test-path", true); assertEquals(checksumFormat.read(blobContainer, "test-path").getText(), testString); randomCorruption(blobContainer, "test-path"); try { diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 6538ccf40e735..c13a254922ffa 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -928,7 +928,6 @@ public void testMasterAndDataShutdownDuringSnapshot() throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/25281") public void testMasterShutdownDuringFailedSnapshot() throws Exception { logger.info("--> starting two master nodes and two data nodes"); internalCluster().startMasterOnlyNodes(2); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 6ef892a74bb5a..75c2e1759434a 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -47,7 +47,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; @@ -110,7 +109,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -2538,28 +2536,15 @@ public void testCloseOrDeleteIndexDuringSnapshot() throws Exception { Client client = client(); - boolean allowPartial = randomBoolean(); logger.info("--> creating repository"); - // only block on repo init if we have partial snapshot or we run into deadlock when acquiring shard locks for index deletion/closing - boolean initBlocking = allowPartial || randomBoolean(); - if (initBlocking) { - assertAcked(client.admin().cluster().preparePutRepository("test-repo") - .setType("mock").setSettings(Settings.builder() - .put("location", randomRepoPath()) - .put("compress", randomBoolean()) - .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) - .put("block_on_init", true) - )); - } else { - assertAcked(client.admin().cluster().preparePutRepository("test-repo") - .setType("mock").setSettings(Settings.builder() - .put("location", randomRepoPath()) - .put("compress", randomBoolean()) - .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) - .put("block_on_data", true) - )); - } + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("mock").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put("block_on_data", true))); + createIndex("test-idx-1", "test-idx-2", "test-idx-3"); ensureGreen(); @@ -2575,70 +2560,40 @@ public void testCloseOrDeleteIndexDuringSnapshot() throws Exception { assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); - logger.info("--> snapshot allow partial {}", allowPartial); + logger.info("--> snapshot"); ActionFuture future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") - .setIndices("test-idx-*").setWaitForCompletion(true).setPartial(allowPartial).execute(); + .setIndices("test-idx-*").setWaitForCompletion(true).setPartial(false).execute(); logger.info("--> wait for block to kick in"); - if (initBlocking) { - waitForBlock(internalCluster().getMasterName(), "test-repo", TimeValue.timeValueMinutes(1)); - } else { - waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1)); - } - boolean closedOnPartial = false; + waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1)); + try { - if (allowPartial) { - // partial snapshots allow close / delete operations - if (randomBoolean()) { - logger.info("--> delete index while partial snapshot is running"); + // non-partial snapshots do not allow close / delete operations on indices where snapshot has not been completed + if (randomBoolean()) { + try { + logger.info("--> delete index while non-partial snapshot is running"); client.admin().indices().prepareDelete("test-idx-1").get(); - } else { - logger.info("--> close index while partial snapshot is running"); - closedOnPartial = true; - client.admin().indices().prepareClose("test-idx-1").get(); + fail("Expected deleting index to fail during snapshot"); + } catch (SnapshotInProgressException e) { + assertThat(e.getMessage(), containsString("Cannot delete indices that are being snapshotted: [[test-idx-1/")); } } else { - // non-partial snapshots do not allow close / delete operations on indices where snapshot has not been completed - if (randomBoolean()) { - try { - logger.info("--> delete index while non-partial snapshot is running"); - client.admin().indices().prepareDelete("test-idx-1").get(); - fail("Expected deleting index to fail during snapshot"); - } catch (SnapshotInProgressException e) { - assertThat(e.getMessage(), containsString("Cannot delete indices that are being snapshotted: [[test-idx-1/")); - } - } else { - try { - logger.info("--> close index while non-partial snapshot is running"); - client.admin().indices().prepareClose("test-idx-1").get(); - fail("Expected closing index to fail during snapshot"); - } catch (SnapshotInProgressException e) { - assertThat(e.getMessage(), containsString("Cannot close indices that are being snapshotted: [[test-idx-1/")); - } + try { + logger.info("--> close index while non-partial snapshot is running"); + client.admin().indices().prepareClose("test-idx-1").get(); + fail("Expected closing index to fail during snapshot"); + } catch (SnapshotInProgressException e) { + assertThat(e.getMessage(), containsString("Cannot close indices that are being snapshotted: [[test-idx-1/")); } } } finally { - if (initBlocking) { - logger.info("--> unblock running master node"); - unblockNode("test-repo", internalCluster().getMasterName()); - } else { - logger.info("--> unblock all data nodes"); - unblockAllDataNodes("test-repo"); - } + logger.info("--> unblock all data nodes"); + unblockAllDataNodes("test-repo"); } logger.info("--> waiting for snapshot to finish"); CreateSnapshotResponse createSnapshotResponse = future.get(); - if (allowPartial && closedOnPartial == false) { - logger.info("Deleted/Closed index during snapshot, but allow partial"); - assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.PARTIAL))); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); - assertThat(createSnapshotResponse.getSnapshotInfo().failedShards(), greaterThan(0)); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), - lessThan(createSnapshotResponse.getSnapshotInfo().totalShards())); - } else { - logger.info("Snapshot successfully completed"); - assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.SUCCESS))); - } + logger.info("Snapshot successfully completed"); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.SUCCESS))); } public void testCloseIndexDuringRestore() throws Exception { @@ -3856,76 +3811,6 @@ public void testParallelRestoreOperationsFromSingleSnapshot() throws Exception { assertThat(client.prepareGet(restoredIndexName2, typeName, sameSourceIndex ? docId : docId2).get().isExists(), equalTo(true)); } - public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception { - final Client client = client(); - - // Blocks on initialization - assertAcked(client.admin().cluster().preparePutRepository("repository") - .setType("mock").setSettings(Settings.builder() - .put("location", randomRepoPath()) - .put("block_on_init", true) - )); - - createIndex("test-idx"); - final int nbDocs = scaledRandomIntBetween(100, 500); - for (int i = 0; i < nbDocs; i++) { - index("test-idx", "_doc", Integer.toString(i), "foo", "bar" + i); - } - flushAndRefresh("test-idx"); - assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo((long) nbDocs)); - - // Create a snapshot - client.admin().cluster().prepareCreateSnapshot("repository", "snap").execute(); - waitForBlock(internalCluster().getMasterName(), "repository", TimeValue.timeValueMinutes(1)); - boolean blocked = true; - - // Snapshot is initializing (and is blocked at this stage) - SnapshotsStatusResponse snapshotsStatus = client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get(); - assertThat(snapshotsStatus.getSnapshots().iterator().next().getState(), equalTo(State.INIT)); - - final List states = new CopyOnWriteArrayList<>(); - final ClusterStateListener listener = event -> { - SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); - for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { - if ("snap".equals(entry.snapshot().getSnapshotId().getName())) { - states.add(entry.state()); - } - } - }; - - try { - // Record the upcoming states of the snapshot on all nodes - internalCluster().getInstances(ClusterService.class).forEach(clusterService -> clusterService.addListener(listener)); - - // Delete the snapshot while it is being initialized - ActionFuture delete = client.admin().cluster().prepareDeleteSnapshot("repository", "snap").execute(); - - // The deletion must set the snapshot in the ABORTED state - assertBusy(() -> { - SnapshotsStatusResponse status = - client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get(); - assertThat(status.getSnapshots().iterator().next().getState(), equalTo(State.ABORTED)); - }); - - // Now unblock the repository - unblockNode("repository", internalCluster().getMasterName()); - blocked = false; - - assertAcked(delete.get()); - expectThrows(SnapshotMissingException.class, () -> - client.admin().cluster().prepareGetSnapshots("repository").setSnapshots("snap").get() - .getSnapshots("repository")); - - assertFalse("Expecting snapshot state to be updated", states.isEmpty()); - assertFalse("Expecting snapshot to be aborted and not started at all", states.contains(State.STARTED)); - } finally { - internalCluster().getInstances(ClusterService.class).forEach(clusterService -> clusterService.removeListener(listener)); - if (blocked) { - unblockNode("repository", internalCluster().getMasterName()); - } - } - } - public void testRestoreIncreasesPrimaryTerms() { final String indexName = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT); createIndex(indexName, Settings.builder() diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index e56908511a33d..62ae1d05e9510 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -196,12 +196,14 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener; import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Mockito.mock; public class SnapshotResiliencyTests extends ESTestCase { @@ -493,6 +495,85 @@ public void run() { assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0))); } + public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + String snapshotName = "snapshot"; + final String index = "test"; + + final int shards = randomIntBetween(1, 10); + final int documents = randomIntBetween(2, 100); + TestClusterNode masterNode = + testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); + + final AtomicBoolean documentCountVerified = new AtomicBoolean(); + + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { + final AtomicBoolean initiatedSnapshot = new AtomicBoolean(false); + for (int i = 0; i < documents; ++i) { + masterNode.client.bulk( + new BulkRequest().add(new IndexRequest(index).source(Map.of("foo" + i, "bar", "num", i))).setRefreshPolicy( + WriteRequest.RefreshPolicy.IMMEDIATE), + assertNoFailureListener( + bulkResponse -> { + assertFalse("Failures in bulkresponse: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); + if (initiatedSnapshot.compareAndSet(false, true)) { + masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true).execute(createSnapshotResponseStepListener); + } + })); + } + }); + + final String restoredIndex = "restored"; + + final StepListener restoreSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().restoreSnapshot( + new RestoreSnapshotRequest(repoName, snapshotName) + .renamePattern(index).renameReplacement(restoredIndex).waitForCompletion(true), + restoreSnapshotResponseStepListener)); + + final StepListener searchResponseStepListener = new StepListener<>(); + + continueOrDie(restoreSnapshotResponseStepListener, restoreSnapshotResponse -> { + assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); + masterNode.client.search( + new SearchRequest(restoredIndex).source(new SearchSourceBuilder().size(documents).trackTotalHits(true)), + searchResponseStepListener); + }); + + continueOrDie(searchResponseStepListener, r -> { + final long hitCount = r.getHits().getTotalHits().value; + assertThat( + (int) hitCount + 1, + lessThanOrEqualTo(((Map) masterNode.clusterService.state().metaData().index(restoredIndex).mapping() + .sourceAsMap().get("properties")).size()) + ); + documentCountVerified.set(true); + }); + + runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L)); + + assertNotNull(createSnapshotResponseStepListener.result()); + assertNotNull(restoreSnapshotResponseStepListener.result()); + assertTrue(documentCountVerified.get()); + SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); + assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); + final Repository repository = masterNode.repositoriesService.repository(repoName); + Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + assertThat(snapshotIds, hasSize(1)); + + final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertThat(snapshotInfo.indices(), containsInAnyOrder(index)); + assertEquals(shards, snapshotInfo.successfulShards()); + assertEquals(0, snapshotInfo.failedShards()); + } + private StepListener createRepoAndIndex(TestClusterNode masterNode, String repoName, String index, int shards) { final AdminClient adminClient = masterNode.client.admin(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index d21f3db81e69c..00e8cb5de262d 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -276,9 +276,11 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b // We do some checks in case there is a consistent state for a blob to prevent turning it inconsistent. final boolean hasConsistentContent = relevantActions.size() == 1 && relevantActions.get(0).operation == Operation.PUT; - if (BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) { + if (BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName) + || blobName.startsWith(BlobStoreRepository.METADATA_PREFIX)) { // TODO: Ensure that it is impossible to ever decrement the generation id stored in index.latest then assert that - // it never decrements here + // it never decrements here. Same goes for the metadata, ensure that we never overwrite newer with older + // metadata. } else if (blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)) { if (hasConsistentContent) { if (basePath().buildAsString().equals(path().buildAsString())) { diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 81934fe93bd8a..48a6737d82842 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.snapshots.mockstore; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; @@ -158,19 +159,19 @@ public void testOverwriteSnapshotInfoBlob() { // We create a snap- blob for snapshot "foo" in the first generation final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID()); repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), - -1L, false, Collections.emptyMap()); + -1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap()); // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. final AssertionError assertionError = expectThrows(AssertionError.class, () -> repository.finalizeSnapshot( snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(), - 0, false, Collections.emptyMap())); + 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap())); assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>")); // We try to write yet another snap- blob for "foo" in the next generation. // It passes cleanly because the content of the blob except for the timestamps. repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), - 0, false, Collections.emptyMap()); + 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap()); } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index a552e7ac54664..774e8651331cd 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -24,7 +24,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -38,10 +37,8 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.RepositoryPlugin; -import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.fs.FsRepository; -import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -101,8 +98,6 @@ public long getFailureCount() { private final String randomPrefix; - private volatile boolean blockOnInitialization; - private volatile boolean blockOnControlFiles; private volatile boolean blockOnDataFiles; @@ -125,21 +120,12 @@ public MockRepository(RepositoryMetaData metadata, Environment environment, maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L); blockOnControlFiles = metadata.settings().getAsBoolean("block_on_control", false); blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false); - blockOnInitialization = metadata.settings().getAsBoolean("block_on_init", false); blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false); randomPrefix = metadata.settings().get("random", "default"); waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L); logger.info("starting mock repository with random prefix {}", randomPrefix); } - @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetadata) { - if (blockOnInitialization) { - blockExecution(); - } - super.initializeSnapshot(snapshotId, indices, clusterMetadata); - } - private static RepositoryMetaData overrideSettings(RepositoryMetaData metadata, Environment environment) { // TODO: use another method of testing not being able to read the test file written by the master... // this is super duper hacky @@ -173,7 +159,6 @@ public synchronized void unblock() { // Clean blocking flags, so we wouldn't try to block again blockOnDataFiles = false; blockOnControlFiles = false; - blockOnInitialization = false; blockOnWriteIndexFile = false; blockAndFailOnWriteSnapFile = false; this.notifyAll(); @@ -199,7 +184,7 @@ private synchronized boolean blockExecution() { logger.debug("[{}] Blocking execution", metadata.name()); boolean wasBlocked = false; try { - while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile || + while (blockOnDataFiles || blockOnControlFiles || blockOnWriteIndexFile || blockAndFailOnWriteSnapFile) { blocked = true; this.wait(); @@ -377,6 +362,9 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, final boolean failIfAlreadyExists) throws IOException { final Random random = RandomizedContext.current().getRandom(); + if (blobName.startsWith("index-") && blockOnWriteIndexFile) { + blockExecutionAndFail(blobName); + } if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) { // Simulate a failure between the write and move operation in FsBlobContainer final String tempBlobName = FsBlobContainer.tempBlobName(blobName); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 418cee00c0d21..2aca8dd62938c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -93,14 +93,10 @@ public RepositoryData getRepositoryData() { return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map); } - @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { - } - @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, - boolean includeGlobalState, Map userMetadata) { + boolean includeGlobalState, MetaData metaData, Map userMetadata) { return null; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 0231681666f5a..43c31fbf52123 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -248,15 +248,10 @@ public RepositoryData getRepositoryData() { return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots); } - @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { - throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); - } - @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, - Map userMetadata) { + MetaData metaData, Map userMetadata) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index f6c3124c9be9f..707f49c9c18f4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -41,6 +41,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.function.Supplier; @@ -73,7 +74,9 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { } @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState, MetaData metaData, + Map userMetadata) { // we process the index metadata at snapshot time. This means if somebody tries to restore // a _source only snapshot with a plain repository it will be just fine since we already set the // required engine, that the index is read-only and the mapping to a default mapping @@ -100,7 +103,8 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); builder.put(indexMetadataBuilder); } - super.initializeSnapshot(snapshotId, indices, builder.build()); + return super.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, + includeGlobalState, builder.build(), userMetadata); } catch (IOException ex) { throw new UncheckedIOException(ex); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 14aae50b3b1cd..a760d04ae50f1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; @@ -65,7 +64,6 @@ import java.io.IOException; import java.nio.file.Path; -import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -194,9 +192,6 @@ public void testRestoreMinmal() throws IOException { try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); runAsSnapshot(shard.getThreadPool(), () -> { - repository.initializeSnapshot(snapshotId, Arrays.asList(indexId), - MetaData.builder().put(shard.indexSettings() - .getIndexMetaData(), false).build()); repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), indexShardSnapshotStatus); }); From 78a18f415fc7894a8e62f59604e97105c359f3c3 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 19 Aug 2019 05:27:44 +0200 Subject: [PATCH 2/7] fix test --- .../snapshots/SourceOnlySnapshotShardTests.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index a760d04ae50f1..d882cb1e2ffba 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; @@ -64,6 +65,7 @@ import java.io.IOException; import java.nio.file.Path; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -194,6 +196,10 @@ public void testRestoreMinmal() throws IOException { runAsSnapshot(shard.getThreadPool(), () -> { repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), indexShardSnapshotStatus); + repository.finalizeSnapshot(snapshotId, Collections.singletonList(indexId), + indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(), + repository.getRepositoryData().getGenId(), true, + MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap()); }); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); From 16c6213ef5ad6816998809d09721defe8a34a538 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 19 Aug 2019 06:36:14 +0200 Subject: [PATCH 3/7] introduce BwC logic and fix documentation --- .../repositories/FilterRepository.java | 4 ++ .../repositories/Repository.java | 13 ++++ .../blobstore/BlobStoreRepository.java | 30 ++++++++- .../repositories/blobstore/package-info.java | 17 ++++- .../snapshots/SnapshotsService.java | 65 ++++++++++++------- .../RepositoriesServiceTests.java | 5 ++ .../index/shard/RestoreOnlyRepository.java | 4 ++ .../xpack/ccr/repository/CcrRepository.java | 4 ++ .../SourceOnlySnapshotRepository.java | 63 +++++++++++------- 9 files changed, 153 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 28e36a7ae9bd8..916c2810d673a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -73,6 +73,10 @@ public RepositoryData getRepositoryData() { return in.getRepositoryData(); } + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + in.initializeSnapshot(snapshotId, indices, metaData); + } @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 6a67a33d6af18..2b617803ac65a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -108,6 +108,19 @@ default Repository create(RepositoryMetaData metaData, Function indices, MetaData metaData); + /** * Finalizes snapshotting process *

    diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index bab6e4b924ba6..b9e19705cb29d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -84,6 +84,7 @@ import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryVerificationException; +import org.elasticsearch.snapshots.InvalidSnapshotNameException; import org.elasticsearch.snapshots.SnapshotCreationException; import org.elasticsearch.snapshots.SnapshotException; import org.elasticsearch.snapshots.SnapshotId; @@ -352,6 +353,31 @@ public RepositoryMetaData getMetadata() { return metadata; } + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetaData) { + if (isReadOnly()) { + throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository"); + } + try { + final String snapshotName = snapshotId.getName(); + // check if the snapshot name already exists in the repository + final RepositoryData repositoryData = getRepositoryData(); + if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { + throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists"); + } + + // Write Global MetaData + globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), true); + + // write the index metadata for each index in the snapshot + for (IndexId index : indices) { + indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), true); + } + } catch (IOException ex) { + throw new SnapshotCreationException(metadata.name(), snapshotId, ex); + } + } + @Override public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { if (isReadOnly()) { @@ -543,7 +569,7 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, // We ignore all FileAlreadyExistsException here since otherwise a master failover while in this method will // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the // index or global metadata will be compatible with the segments written in this snapshot as well. - // Failing on an already existing snap-${uuid}.dat below ensures that the index.latest blob is not updated in a way that + // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way that // decrements the generation it points at // Write Global MetaData @@ -554,7 +580,7 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false); } } catch (IOException ex) { - throw new SnapshotCreationException(metadata.name(), snapshotId, ex); + throw new SnapshotException(metadata.name(), snapshotId, "failed to write metadata for snapshot", ex); } try { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java index 7069ae193ca13..5cc98f6c3e99b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java @@ -118,9 +118,19 @@ * *

    Creating a snapshot in the repository happens in the three steps described in detail below.

    * - *

    Initializing a Snapshot in the Repository

    + *

    Initializing a Snapshot in the Repository (Mixed Version Clusters only)

    * - * TODO: Adjust these docs for https://github.com/elastic/elasticsearch/issues/41581 + *

    In mixed version clusters that contain a node older than + * {@link org.elasticsearch.snapshots.SnapshotsService#NO_REPO_INITIALIZE_VERSION}, creating a snapshot in the repository starts with a + * call to {@link org.elasticsearch.repositories.Repository#initializeSnapshot} which the blob store repository implements via the + * following actions:

    + *
      + *
    1. Verify that no snapshot by the requested name exists.
    2. + *
    3. Write a blob containing the cluster metadata to the root of the blob store repository at {@code /meta-${snapshot-uuid}.dat}
    4. + *
    5. Write the metadata for each index to a blob in that index's directory at + * {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat}
    6. + *
    + * TODO: Remove this section once BwC logic it references is removed * *

    Writing Shard Data (Segments)

    * @@ -156,6 +166,9 @@ * to finalizing the snapshot by invoking {@link org.elasticsearch.repositories.Repository#finalizeSnapshot}. This method executes the * following actions in order:

    *
      + *
    1. Write a blob containing the cluster metadata to the root of the blob store repository at {@code /meta-${snapshot-uuid}.dat}
    2. + *
    3. Write the metadata for each index to a blob in that index's directory at + * {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat}
    4. *
    5. Write the {@link org.elasticsearch.snapshots.SnapshotInfo} blob for the given snapshot to the key {@code /snap-${snapshot-uuid}.dat} * directly under the repository root.
    6. *
    7. Write an updated {@code RepositoryData} blob to the key {@code /index-${N+1}} using the {@code N} determined when initializing the diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index a39edac42a0ff..de961b6c2ce5a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -26,6 +26,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; @@ -100,7 +101,7 @@ *
    8. On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that * no snapshot is currently running and registers the new snapshot in cluster state
    9. *
    10. When cluster state is updated - * the {@link #beginSnapshot(SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes + * the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes * the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state
    11. *
    12. Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(SnapshotsInProgress)} method
    13. @@ -108,12 +109,19 @@ * the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus)} method *
    14. When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot * as completed
    15. - *
    16. After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository, + *
    17. After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry, MetaData)} finalizes snapshot in the repository, * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls * {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state
    18. *
*/ public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier { + + /** + * Minimum node version which does not use {@link Repository#initializeSnapshot(SnapshotId, List, MetaData)} to write snapshot metadata + * when starting a snapshot. + */ + public static final Version NO_REPO_INITIALIZE_VERSION = Version.V_8_0_0; + private static final Logger logger = LogManager.getLogger(SnapshotsService.class); private final ClusterService clusterService; @@ -304,7 +312,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl if (newSnapshot != null) { final Snapshot current = newSnapshot.snapshot(); assert initializingSnapshots.contains(current); - beginSnapshot(newSnapshot, request.partial(), new ActionListener<>() { + beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener<>() { @Override public void onResponse(final Snapshot snapshot) { initializingSnapshots.remove(snapshot); @@ -373,11 +381,14 @@ private static void validate(final String repositoryName, final String snapshotN *

* Creates snapshot in repository and updates snapshot metadata record with list of shards that needs to be processed. * + * @param clusterState cluster state * @param snapshot snapshot meta data * @param partial allow partial snapshots * @param userCreateSnapshotListener listener */ - private void beginSnapshot(final SnapshotsInProgress.Entry snapshot, final boolean partial, + private void beginSnapshot(final ClusterState clusterState, + final SnapshotsInProgress.Entry snapshot, + final boolean partial, final ActionListener userCreateSnapshotListener) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { @@ -390,15 +401,23 @@ protected void doRun() { assert initializingSnapshots.contains(snapshot.snapshot()); Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository()); - final String snapshotName = snapshot.snapshot().getSnapshotId().getName(); - if (repository.isReadOnly()) { - throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"); - } - // check if the snapshot name already exists in the repository - final RepositoryData repositoryData = repository.getRepositoryData(); - if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { - throw new InvalidSnapshotNameException( - repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists"); + if (clusterState.nodes().getMinNodeVersion().onOrAfter(NO_REPO_INITIALIZE_VERSION)) { + final String snapshotName = snapshot.snapshot().getSnapshotId().getName(); + if (repository.isReadOnly()) { + throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"); + } + // check if the snapshot name already exists in the repository + final RepositoryData repositoryData = repository.getRepositoryData(); + if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { + throw new InvalidSnapshotNameException( + repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists"); + } + } else { + // In mixed version clusters we initialize the snapshot in the repository so that in case of a master failover to an + // older version master node snapshot finalization (that assumes initializeSnapshot was called) produces a valid + // snapshot. + repository.initializeSnapshot( + snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaDataForSnapshot(snapshot, clusterState.metaData())); } snapshotCreated = true; @@ -406,7 +425,7 @@ protected void doRun() { if (snapshot.indices().isEmpty()) { // No indices in this snapshot - we are done userCreateSnapshotListener.onResponse(snapshot.snapshot()); - endSnapshot(snapshot); + endSnapshot(snapshot, clusterState.metaData()); return; } clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { @@ -489,7 +508,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS assert snapshotsInProgress != null; final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot()); assert entry != null; - endSnapshot(entry); + endSnapshot(entry, newState.metaData()); } } }); @@ -538,9 +557,6 @@ public void onNoLongerMaster() { private void cleanupAfterError(Exception exception) { if(snapshotCreated) { try { - // TODO: Get the metadata from cluster state update to be safe? - final MetaData metaData = metaDataForSnapshot(snapshot); - repositoriesService.repository(snapshot.snapshot().getRepository()) .finalizeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), @@ -550,7 +566,7 @@ private void cleanupAfterError(Exception exception) { Collections.emptyList(), snapshot.getRepositoryStateId(), snapshot.includeGlobalState(), - metaData, + metaDataForSnapshot(snapshot, clusterService.state().metaData()), snapshot.userMetadata()); } catch (Exception inner) { inner.addSuppressed(exception); @@ -562,9 +578,8 @@ private void cleanupAfterError(Exception exception) { } } - private MetaData metaDataForSnapshot(final SnapshotsInProgress.Entry snapshot) { - MetaData metaData = clusterService.state().metaData(); - if (!snapshot.includeGlobalState()) { + private static MetaData metaDataForSnapshot(SnapshotsInProgress.Entry snapshot, MetaData metaData) { + if (snapshot.includeGlobalState() == false) { // Remove global state from the cluster state MetaData.Builder builder = MetaData.builder(); for (IndexId index : snapshot.indices()) { @@ -720,7 +735,7 @@ public void applyClusterState(ClusterChangedEvent event) { entry -> entry.state().completed() || initializingSnapshots.contains(entry.snapshot()) == false && (entry.state() == State.INIT || completed(entry.shards().values())) - ).forEach(this::endSnapshot); + ).forEach(entry -> endSnapshot(entry, event.state().metaData())); } if (newMaster) { finalizeSnapshotDeletionFromPreviousMaster(event); @@ -967,7 +982,7 @@ private static Tuple, Set> indicesWithMissingShards( * * @param entry snapshot */ - private void endSnapshot(final SnapshotsInProgress.Entry entry) { + private void endSnapshot(SnapshotsInProgress.Entry entry, MetaData metaData) { if (endingSnapshots.add(entry.snapshot()) == false) { return; } @@ -995,7 +1010,7 @@ protected void doRun() { unmodifiableList(shardFailures), entry.getRepositoryStateId(), entry.includeGlobalState(), - metaDataForSnapshot(entry), + metaDataForSnapshot(entry, metaData), entry.userMetadata()); removeSnapshotFromClusterState(snapshot, snapshotInfo, null); logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 37c3f2940a64e..6d987d495543d 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -153,6 +153,11 @@ public RepositoryData getRepositoryData() { return null; } + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + + } + @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 2aca8dd62938c..e9260f2a74fad 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -93,6 +93,10 @@ public RepositoryData getRepositoryData() { return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map); } + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + } + @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 43c31fbf52123..6aff8ef4725f9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -248,6 +248,10 @@ public RepositoryData getRepositoryData() { return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots); } + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index 707f49c9c18f4..0973acd554541 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -73,6 +73,18 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { super(in); } + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + // we process the index metadata at snapshot time. This means if somebody tries to restore + // a _source only snapshot with a plain repository it will be just fine since we already set the + // required engine, that the index is read-only and the mapping to a default mapping + try { + super.initializeSnapshot(snapshotId, indices, metadataToSnapshot(indices, metaData)); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, MetaData metaData, @@ -81,35 +93,40 @@ public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indice // a _source only snapshot with a plain repository it will be just fine since we already set the // required engine, that the index is read-only and the mapping to a default mapping try { - MetaData.Builder builder = MetaData.builder(metaData); - for (IndexId indexId : indices) { - IndexMetaData index = metaData.index(indexId.getName()); - IndexMetaData.Builder indexMetadataBuilder = IndexMetaData.builder(index); - // for a minimal restore we basically disable indexing on all fields and only create an index - // that is valid from an operational perspective. ie. it will have all metadata fields like version/ - // seqID etc. and an indexed ID field such that we can potentially perform updates on them or delete documents. - ImmutableOpenMap mappings = index.getMappings(); - Iterator> iterator = mappings.iterator(); - while (iterator.hasNext()) { - ObjectObjectCursor next = iterator.next(); - // we don't need to obey any routing here stuff is read-only anyway and get is disabled - final String mapping = "{ \"" + next.key + "\": { \"enabled\": false, \"_meta\": " + next.value.source().string() - + " } }"; - indexMetadataBuilder.putMapping(next.key, mapping); - } - indexMetadataBuilder.settings(Settings.builder().put(index.getSettings()) - .put(SOURCE_ONLY.getKey(), true) - .put("index.blocks.write", true)); // read-only! - indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); - builder.put(indexMetadataBuilder); - } return super.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState, builder.build(), userMetadata); + includeGlobalState, metadataToSnapshot(indices, metaData), userMetadata); } catch (IOException ex) { throw new UncheckedIOException(ex); } } + private static MetaData metadataToSnapshot(List indices, MetaData metaData) throws IOException { + MetaData.Builder builder = MetaData.builder(metaData); + for (IndexId indexId : indices) { + IndexMetaData index = metaData.index(indexId.getName()); + IndexMetaData.Builder indexMetadataBuilder = IndexMetaData.builder(index); + // for a minimal restore we basically disable indexing on all fields and only create an index + // that is valid from an operational perspective. ie. it will have all metadata fields like version/ + // seqID etc. and an indexed ID field such that we can potentially perform updates on them or delete documents. + ImmutableOpenMap mappings = index.getMappings(); + Iterator> iterator = mappings.iterator(); + while (iterator.hasNext()) { + ObjectObjectCursor next = iterator.next(); + // we don't need to obey any routing here stuff is read-only anyway and get is disabled + final String mapping = "{ \"" + next.key + "\": { \"enabled\": false, \"_meta\": " + next.value.source().string() + + " } }"; + indexMetadataBuilder.putMapping(next.key, mapping); + } + indexMetadataBuilder.settings(Settings.builder().put(index.getSettings()) + .put(SOURCE_ONLY.getKey(), true) + .put("index.blocks.write", true)); // read-only! + indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); + builder.put(indexMetadataBuilder); + } + return builder.build(); + } + + @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { From f2e4272610d75d8e98e7b0045d8d7bfa356ba5cc Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 19 Aug 2019 07:50:13 +0200 Subject: [PATCH 4/7] fix empty line change and use constant --- .../java/org/elasticsearch/repositories/FilterRepository.java | 1 + .../org/elasticsearch/snapshots/mockstore/MockRepository.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 916c2810d673a..5d926e861a564 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -77,6 +77,7 @@ public RepositoryData getRepositoryData() { public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { in.initializeSnapshot(snapshotId, indices, metaData); } + @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 774e8651331cd..c8f6c289f2f93 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -38,6 +38,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.threadpool.ThreadPool; @@ -362,7 +363,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, final boolean failIfAlreadyExists) throws IOException { final Random random = RandomizedContext.current().getRandom(); - if (blobName.startsWith("index-") && blockOnWriteIndexFile) { + if (blobName.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX) && blockOnWriteIndexFile) { blockExecutionAndFail(blobName); } if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) { From d1b32c5bc776802f0d0c8fa1e1950496df0fe7c0 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 19 Aug 2019 08:04:09 +0200 Subject: [PATCH 5/7] nicer looking test --- .../snapshots/SnapshotResiliencyTests.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 62ae1d05e9510..c864bc2b95550 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -507,16 +507,14 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { TestClusterNode masterNode = testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); - final AtomicBoolean documentCountVerified = new AtomicBoolean(); - final StepListener createSnapshotResponseStepListener = new StepListener<>(); continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { final AtomicBoolean initiatedSnapshot = new AtomicBoolean(false); for (int i = 0; i < documents; ++i) { masterNode.client.bulk( - new BulkRequest().add(new IndexRequest(index).source(Map.of("foo" + i, "bar", "num", i))).setRefreshPolicy( - WriteRequest.RefreshPolicy.IMMEDIATE), + new BulkRequest().add(new IndexRequest(index).source(Map.of("foo" + i, "bar", "num", i))) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), assertNoFailureListener( bulkResponse -> { assertFalse("Failures in bulkresponse: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); @@ -534,8 +532,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().restoreSnapshot( new RestoreSnapshotRequest(repoName, snapshotName) - .renamePattern(index).renameReplacement(restoredIndex).waitForCompletion(true), - restoreSnapshotResponseStepListener)); + .renamePattern(index).renameReplacement(restoredIndex).waitForCompletion(true), restoreSnapshotResponseStepListener)); final StepListener searchResponseStepListener = new StepListener<>(); @@ -546,9 +543,12 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { searchResponseStepListener); }); + final AtomicBoolean documentCountVerified = new AtomicBoolean(); + continueOrDie(searchResponseStepListener, r -> { final long hitCount = r.getHits().getTotalHits().value; assertThat( + "Documents were restored but the restored index mapping was older than some documents and misses some of their fields", (int) hitCount + 1, lessThanOrEqualTo(((Map) masterNode.clusterService.state().metaData().index(restoredIndex).mapping() .sourceAsMap().get("properties")).size()) @@ -560,7 +560,6 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { assertNotNull(createSnapshotResponseStepListener.result()); assertNotNull(restoreSnapshotResponseStepListener.result()); - assertTrue(documentCountVerified.get()); SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); From 70b6379402c6a7b2344f5c365e0ff5b3c37c83a7 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 19 Aug 2019 08:43:59 +0200 Subject: [PATCH 6/7] adjust failing test --- .../elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 75c2e1759434a..b11cf9107e30b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -3448,7 +3448,7 @@ public void testSnapshotSucceedsAfterSnapshotFailure() throws Exception { assertThat(shardFailure.reason(), containsString("Random IOException")); } } - } catch (SnapshotCreationException | RepositoryException ex) { + } catch (SnapshotException | RepositoryException ex) { // sometimes, the snapshot will fail with a top level I/O exception assertThat(ExceptionsHelper.stackTrace(ex), containsString("Random IOException")); } From f844f8ad9fa0af86c5288cd346b1c4a9654cd896 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 26 Aug 2019 19:39:25 +0200 Subject: [PATCH 7/7] CR: comments --- .../blobstore/BlobStoreRepository.java | 11 ---------- .../snapshots/SnapshotsService.java | 22 +++++++++---------- .../DedicatedClusterSnapshotRestoreIT.java | 1 + .../snapshots/SnapshotResiliencyTests.java | 5 +++-- .../snapshots/mockstore/MockRepository.java | 4 ---- 5 files changed, 14 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 4b20a60e3b1fe..78f399dabf7c3 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -86,7 +86,6 @@ import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryVerificationException; -import org.elasticsearch.snapshots.InvalidSnapshotNameException; import org.elasticsearch.snapshots.SnapshotCreationException; import org.elasticsearch.snapshots.SnapshotException; import org.elasticsearch.snapshots.SnapshotId; @@ -357,17 +356,7 @@ public RepositoryMetaData getMetadata() { @Override public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetaData) { - if (isReadOnly()) { - throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository"); - } try { - final String snapshotName = snapshotId.getName(); - // check if the snapshot name already exists in the repository - final RepositoryData repositoryData = getRepositoryData(); - if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { - throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists"); - } - // Write Global MetaData globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), true); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 0cdba91966bd9..bbd6938802530 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -407,18 +407,16 @@ protected void doRun() { assert initializingSnapshots.contains(snapshot.snapshot()); Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository()); - if (clusterState.nodes().getMinNodeVersion().onOrAfter(NO_REPO_INITIALIZE_VERSION)) { - final String snapshotName = snapshot.snapshot().getSnapshotId().getName(); - if (repository.isReadOnly()) { - throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"); - } - // check if the snapshot name already exists in the repository - final RepositoryData repositoryData = repository.getRepositoryData(); - if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { - throw new InvalidSnapshotNameException( - repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists"); - } - } else { + if (repository.isReadOnly()) { + throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"); + } + final String snapshotName = snapshot.snapshot().getSnapshotId().getName(); + // check if the snapshot name already exists in the repository + if (repository.getRepositoryData().getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { + throw new InvalidSnapshotNameException( + repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists"); + } + if (clusterState.nodes().getMinNodeVersion().onOrAfter(NO_REPO_INITIALIZE_VERSION) == false) { // In mixed version clusters we initialize the snapshot in the repository so that in case of a master failover to an // older version master node snapshot finalization (that assumes initializeSnapshot was called) produces a valid // snapshot. diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index e07107f790b49..d9bc6dd02209c 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -931,6 +931,7 @@ public void testMasterAndDataShutdownDuringSnapshot() throws Exception { } } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/25281") public void testMasterShutdownDuringFailedSnapshot() throws Exception { logger.info("--> starting two master nodes and two data nodes"); internalCluster().startMasterOnlyNodes(2); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 841823b80705e..a3a668b725241 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -512,8 +512,9 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { final AtomicBoolean initiatedSnapshot = new AtomicBoolean(false); for (int i = 0; i < documents; ++i) { + // Index a few documents with different field names so we trigger a dynamic mapping update for each of them masterNode.client.bulk( - new BulkRequest().add(new IndexRequest(index).source(Map.of("foo" + i, "bar", "num", i))) + new BulkRequest().add(new IndexRequest(index).source(Map.of("foo" + i, "bar"))) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), assertNoFailureListener( bulkResponse -> { @@ -549,7 +550,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { final long hitCount = r.getHits().getTotalHits().value; assertThat( "Documents were restored but the restored index mapping was older than some documents and misses some of their fields", - (int) hitCount + 1, + (int) hitCount, lessThanOrEqualTo(((Map) masterNode.clusterService.state().metaData().index(restoredIndex).mapping() .sourceAsMap().get("properties")).size()) ); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 015b41a490da4..fa33f8aef8679 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -39,7 +39,6 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.threadpool.ThreadPool; @@ -370,9 +369,6 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, final boolean failIfAlreadyExists) throws IOException { final Random random = RandomizedContext.current().getRandom(); - if (blobName.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX) && blockOnWriteIndexFile) { - blockExecutionAndFail(blobName); - } if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) { // Simulate a failure between the write and move operation in FsBlobContainer final String tempBlobName = FsBlobContainer.tempBlobName(blobName);