Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track Repository Gen. in BlobStoreRepository (#48944) #49116

Merged
merged 1 commit into from
Nov 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -372,7 +373,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
} else {
try {
final Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
final RepositoryData repositoryData = getRepositoryData(latestGeneration(rootBlobs.keySet()));
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
// delete an index that was created by another master node after writing this index-N blob.
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
Expand All @@ -383,6 +384,30 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
}
}

/**
* Loads {@link RepositoryData} ensuring that it is consistent with the given {@code rootBlobs} as well of the assumed generation.
*
* @param repositoryStateId Expected repository generation
* @param rootBlobs Blobs at the repository root
* @return RepositoryData
*/
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) {
final long generation = latestGeneration(rootBlobs.keySet());
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId));
if (genToLoad > generation) {
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
// debug log it. Any blobs leaked as a result of an inconsistent listing here will be cleaned up in a subsequent cleanup or
// snapshot delete run anyway.
logger.debug("Determined repository's generation from its contents to [" + generation + "] but " +
"current generation is at least [" + genToLoad + "]");
}
if (genToLoad != repositoryStateId) {
throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" +
repositoryStateId + "], actual current generation [" + genToLoad + "]");
}
return getRepositoryData(genToLoad);
}

/**
* After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation
* and then has all now unreferenced blobs in it deleted.
Expand Down Expand Up @@ -610,14 +635,8 @@ public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListen
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository");
}
final RepositoryData repositoryData = getRepositoryData();
if (repositoryData.getGenId() != repositoryStateId) {
// Check that we are working on the expected repository version before gathering the data to clean up
throw new RepositoryException(metadata.name(), "concurrent modification of the repository before cleanup started, " +
"expected current generation [" + repositoryStateId + "], actual current generation ["
+ repositoryData.getGenId() + "]");
}
Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
final Set<String> survivingIndexIds =
repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
Expand Down Expand Up @@ -903,12 +922,36 @@ public void endVerification(String seed) {
}
}

// Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs
// and concurrent modifications.
// Protected for use in MockEventuallyConsistentRepository
protected final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);

@Override
public RepositoryData getRepositoryData() {
try {
return getRepositoryData(latestIndexBlobId());
} catch (IOException ioe) {
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
while (true) {
final long generation;
try {
generation = latestIndexBlobId();
} catch (IOException ioe) {
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
}
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
if (genToLoad > generation) {
logger.info("Determined repository generation [" + generation
+ "] from repository contents but correct generation must be at least [" + genToLoad + "]");
}
try {
return getRepositoryData(genToLoad);
} catch (RepositoryException e) {
if (genToLoad != latestKnownRepoGen.get()) {
logger.warn("Failed to load repository data generation [" + genToLoad +
"] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e);
continue;
}
throw e;
}
}
}

Expand All @@ -926,6 +969,12 @@ private RepositoryData getRepositoryData(long indexGen) {
return RepositoryData.snapshotsFromXContent(parser, indexGen);
}
} catch (IOException ioe) {
// If we fail to load the generation we tracked in latestKnownRepoGen we reset it.
// This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent
// operations must start from the EMPTY_REPO_GEN again
if (latestKnownRepoGen.compareAndSet(indexGen, RepositoryData.EMPTY_REPO_GEN)) {
logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]", ioe);
}
throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe);
}
}
Expand All @@ -951,11 +1000,21 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long exp
"] - possibly due to simultaneous snapshot deletion requests");
}
final long newGen = currentGen + 1;
if (latestKnownRepoGen.get() >= newGen) {
throw new IllegalArgumentException(
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already");
}
// write the index file
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob,
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen));
if (newGen < latestKnownGen) {
// Don't mess up the index.latest blob
throw new IllegalStateException(
"Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]");
}
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public void testConcurrentSnapshotCreateAndDelete() {
final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>();

continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> masterNode.client.admin().cluster()
.prepareCreateSnapshot(repoName, snapshotName).execute(createAnotherSnapshotResponseStepListener));
.prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createAnotherSnapshotResponseStepListener));
continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse ->
assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS));

Expand Down Expand Up @@ -1146,7 +1146,7 @@ protected void assertSnapshotOrGenericThread() {
} else {
return metaData -> {
final Repository repository = new MockEventuallyConsistentRepository(
metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext);
metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext, random());
repository.start();
return repository;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
Expand All @@ -63,18 +65,22 @@
*/
public class MockEventuallyConsistentRepository extends BlobStoreRepository {

private final Random random;

private final Context context;

private final NamedXContentRegistry namedXContentRegistry;

public MockEventuallyConsistentRepository(
RepositoryMetaData metadata,
NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool,
Context context) {
super(metadata,false, namedXContentRegistry, threadPool);
final RepositoryMetaData metadata,
final NamedXContentRegistry namedXContentRegistry,
final ThreadPool threadPool,
final Context context,
final Random random) {
super(metadata, false, namedXContentRegistry, threadPool);
this.context = context;
this.namedXContentRegistry = namedXContentRegistry;
this.random = random;
}

// Filters out all actions that are super-seeded by subsequent actions
Expand Down Expand Up @@ -111,6 +117,9 @@ public BlobPath basePath() {
*/
public static final class Context {

// Eventual consistency is only simulated as long as this flag is false
private boolean consistent;

private final List<BlobStoreAction> actions = new ArrayList<>();

/**
Expand All @@ -121,6 +130,7 @@ public void forceConsistent() {
final List<BlobStoreAction> consistentActions = consistentView(actions);
actions.clear();
actions.addAll(consistentActions);
consistent = true;
}
}
}
Expand Down Expand Up @@ -244,14 +254,14 @@ public Map<String, BlobMetaData> listBlobs() {
ensureNotClosed();
final String thisPath = path.buildAsString();
synchronized (context.actions) {
return consistentView(context.actions).stream()
return maybeMissLatestIndexN(consistentView(context.actions).stream()
.filter(
action -> action.path.startsWith(thisPath) && action.path.substring(thisPath.length()).indexOf('/') == -1
&& action.operation == Operation.PUT)
.collect(
Collectors.toMap(
action -> action.path.substring(thisPath.length()),
action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length)));
action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length))));
}
}

Expand All @@ -272,9 +282,21 @@ public Map<String, BlobContainer> children() {

@Override
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) {
return
listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix)).collect(
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return maybeMissLatestIndexN(
listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

// Randomly filter out the latest /index-N blob from a listing to test that tracking of it in latestKnownRepoGen
// overrides an inconsistent listing
private Map<String, BlobMetaData> maybeMissLatestIndexN(Map<String, BlobMetaData> listing) {
// Only filter out latest index-N at the repo root and only as long as we're not in a forced consistent state
if (path.parent() == null && context.consistent == false && random.nextBoolean()) {
final Map<String, BlobMetaData> filtered = new HashMap<>(listing);
filtered.remove(BlobStoreRepository.INDEX_FILE_PREFIX + latestKnownRepoGen.get());
return Collections.unmodifiableMap(filtered);
}
return listing;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testReadAfterWriteConsistently() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
Expand All @@ -70,7 +70,7 @@ public void testReadAfterWriteAfterReadThrows() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
Expand All @@ -86,7 +86,7 @@ public void testReadAfterDeleteAfterWriteThrows() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
Expand All @@ -104,7 +104,7 @@ public void testOverwriteRandomBlobFails() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer container = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
Expand All @@ -121,7 +121,7 @@ public void testOverwriteShardSnapBlobFails() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer container =
repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0"));
Expand All @@ -143,7 +143,7 @@ public void testOverwriteSnapshotInfoBlob() {
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10)));
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), threadPool, blobStoreContext)) {
xContentRegistry(), threadPool, blobStoreContext, random())) {
repository.start();

// We create a snap- blob for snapshot "foo" in the first generation
Expand Down