diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 35cfe4cdfdc0d..922c8b818cb91 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -113,6 +113,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -366,7 +367,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea } else { try { final Map rootBlobs = blobContainer().listBlobs(); - final RepositoryData repositoryData = getRepositoryData(latestGeneration(rootBlobs.keySet())); + final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs); // Cache the indices that were found before writing out the new index-N blob so that a stuck master will never // delete an index that was created by another master node after writing this index-N blob. final Map foundIndices = blobStore().blobContainer(indicesPath()).children(); @@ -377,6 +378,30 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea } } + /** + * Loads {@link RepositoryData} ensuring that it is consistent with the given {@code rootBlobs} as well of the assumed generation. + * + * @param repositoryStateId Expected repository generation + * @param rootBlobs Blobs at the repository root + * @return RepositoryData + */ + private RepositoryData safeRepositoryData(long repositoryStateId, Map rootBlobs) { + final long generation = latestGeneration(rootBlobs.keySet()); + final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId)); + if (genToLoad > generation) { + // It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just + // debug log it. Any blobs leaked as a result of an inconsistent listing here will be cleaned up in a subsequent cleanup or + // snapshot delete run anyway. + logger.debug("Determined repository's generation from its contents to [" + generation + "] but " + + "current generation is at least [" + genToLoad + "]"); + } + if (genToLoad != repositoryStateId) { + throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" + + repositoryStateId + "], actual current generation [" + genToLoad + "]"); + } + return getRepositoryData(genToLoad); + } + /** * After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation * and then has all now unreferenced blobs in it deleted. @@ -604,14 +629,8 @@ public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListen if (isReadOnly()) { throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository"); } - final RepositoryData repositoryData = getRepositoryData(); - if (repositoryData.getGenId() != repositoryStateId) { - // Check that we are working on the expected repository version before gathering the data to clean up - throw new RepositoryException(metadata.name(), "concurrent modification of the repository before cleanup started, " + - "expected current generation [" + repositoryStateId + "], actual current generation [" - + repositoryData.getGenId() + "]"); - } Map rootBlobs = blobContainer().listBlobs(); + final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs); final Map foundIndices = blobStore().blobContainer(indicesPath()).children(); final Set survivingIndexIds = repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet()); @@ -897,12 +916,36 @@ public void endVerification(String seed) { } } + // Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs + // and concurrent modifications. + // Protected for use in MockEventuallyConsistentRepository + protected final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN); + @Override public RepositoryData getRepositoryData() { - try { - return getRepositoryData(latestIndexBlobId()); - } catch (IOException ioe) { - throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe); + // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository. + while (true) { + final long generation; + try { + generation = latestIndexBlobId(); + } catch (IOException ioe) { + throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe); + } + final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation)); + if (genToLoad > generation) { + logger.info("Determined repository generation [" + generation + + "] from repository contents but correct generation must be at least [" + genToLoad + "]"); + } + try { + return getRepositoryData(genToLoad); + } catch (RepositoryException e) { + if (genToLoad != latestKnownRepoGen.get()) { + logger.warn("Failed to load repository data generation [" + genToLoad + + "] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e); + continue; + } + throw e; + } } } @@ -920,6 +963,12 @@ private RepositoryData getRepositoryData(long indexGen) { return RepositoryData.snapshotsFromXContent(parser, indexGen); } } catch (IOException ioe) { + // If we fail to load the generation we tracked in latestKnownRepoGen we reset it. + // This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent + // operations must start from the EMPTY_REPO_GEN again + if (latestKnownRepoGen.compareAndSet(indexGen, RepositoryData.EMPTY_REPO_GEN)) { + logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]", ioe); + } throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe); } } @@ -945,11 +994,21 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long exp "] - possibly due to simultaneous snapshot deletion requests"); } final long newGen = currentGen + 1; + if (latestKnownRepoGen.get() >= newGen) { + throw new IllegalArgumentException( + "Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already"); + } // write the index file final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); writeAtomic(indexBlob, BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true); + final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen)); + if (newGen < latestKnownGen) { + // Don't mess up the index.latest blob + throw new IllegalStateException( + "Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]"); + } // write the current generation to the index-latest file final BytesReference genBytes; try (BytesStreamOutput bStream = new BytesStreamOutput()) { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index f1a6cca1bc0e6..a5f49edcd79c7 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -396,7 +396,7 @@ public void testConcurrentSnapshotCreateAndDelete() { final StepListener createAnotherSnapshotResponseStepListener = new StepListener<>(); continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> masterNode.client.admin().cluster() - .prepareCreateSnapshot(repoName, snapshotName).execute(createAnotherSnapshotResponseStepListener)); + .prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createAnotherSnapshotResponseStepListener)); continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse -> assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS)); @@ -1133,7 +1133,7 @@ protected void assertSnapshotOrGenericThread() { } else { return metaData -> { final Repository repository = new MockEventuallyConsistentRepository( - metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext); + metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext, random()); repository.start(); return repository; }; diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index 0b5d2c4f858d8..9727dc5f28283 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -47,6 +47,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -64,6 +65,8 @@ */ public class MockEventuallyConsistentRepository extends BlobStoreRepository { + private final Random random; + private final Context context; private final NamedXContentRegistry namedXContentRegistry; @@ -72,10 +75,12 @@ public MockEventuallyConsistentRepository( final RepositoryMetaData metadata, final NamedXContentRegistry namedXContentRegistry, final ThreadPool threadPool, - final Context context) { + final Context context, + final Random random) { super(metadata, namedXContentRegistry, threadPool, BlobPath.cleanPath()); this.context = context; this.namedXContentRegistry = namedXContentRegistry; + this.random = random; } // Filters out all actions that are super-seeded by subsequent actions @@ -107,6 +112,9 @@ protected BlobStore createBlobStore() { */ public static final class Context { + // Eventual consistency is only simulated as long as this flag is false + private boolean consistent; + private final List actions = new ArrayList<>(); /** @@ -117,6 +125,7 @@ public void forceConsistent() { final List consistentActions = consistentView(actions); actions.clear(); actions.addAll(consistentActions); + consistent = true; } } } @@ -240,14 +249,14 @@ public Map listBlobs() { ensureNotClosed(); final String thisPath = path.buildAsString(); synchronized (context.actions) { - return consistentView(context.actions).stream() + return maybeMissLatestIndexN(consistentView(context.actions).stream() .filter( action -> action.path.startsWith(thisPath) && action.path.substring(thisPath.length()).indexOf('/') == -1 && action.operation == Operation.PUT) .collect( Collectors.toMap( action -> action.path.substring(thisPath.length()), - action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length))); + action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length)))); } } @@ -268,9 +277,21 @@ public Map children() { @Override public Map listBlobsByPrefix(String blobNamePrefix) { - return Maps.ofEntries( - listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix)).collect(Collectors.toList()) - ); + return maybeMissLatestIndexN( + Maps.ofEntries(listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix)) + .collect(Collectors.toList()))); + } + + // Randomly filter out the latest /index-N blob from a listing to test that tracking of it in latestKnownRepoGen + // overrides an inconsistent listing + private Map maybeMissLatestIndexN(Map listing) { + // Only filter out latest index-N at the repo root and only as long as we're not in a forced consistent state + if (path.parent() == null && context.consistent == false && random.nextBoolean()) { + final Map filtered = new HashMap<>(listing); + filtered.remove(BlobStoreRepository.INDEX_FILE_PREFIX + latestKnownRepoGen.get()); + return Map.copyOf(filtered); + } + return listing; } @Override diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index b5756c89377ea..5fdfbe9a93a4d 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -50,7 +50,7 @@ public void testReadAfterWriteConsistently() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -70,7 +70,7 @@ public void testReadAfterWriteAfterReadThrows() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -86,7 +86,7 @@ public void testReadAfterDeleteAfterWriteThrows() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -104,7 +104,7 @@ public void testOverwriteRandomBlobFails() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { repository.start(); final BlobContainer container = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -121,7 +121,7 @@ public void testOverwriteShardSnapBlobFails() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { repository.start(); final BlobContainer container = repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0")); @@ -143,7 +143,7 @@ public void testOverwriteSnapshotInfoBlob() { new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10))); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), threadPool, blobStoreContext)) { + xContentRegistry(), threadPool, blobStoreContext, random())) { repository.start(); // We create a snap- blob for snapshot "foo" in the first generation