Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize stale blobs deletion during snapshot delete #3797

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.Metadata;
Expand All @@ -46,6 +48,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.RepositoryVerificationException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -328,4 +331,55 @@ public void testRepositoryVerification() throws Exception {
assertThat(ex.getMessage(), containsString("is not shared"));
}
}

public void testSnapshotShardBlobDelete() throws Exception {
Client client = client();
Path repositoryPath = randomRepoPath();
final String repositoryName = "test-repo";
final String firstSnapshot = "first-snapshot";
final String secondSnapshot = "second-snapshot";
final String indexName = "test-idx";

logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
createRepository(
"test-repo",
"mock",
Settings.builder().put("location", repositoryPath).put(BlobStoreRepository.MAX_SHARD_BLOB_DELETE_BATCH_SIZE.getKey(), 10)
);

logger.info("--> creating index-0 and ingest data");
createIndex(indexName);
ensureGreen();
for (int j = 0; j < 10; j++) {
index(indexName, "_doc", Integer.toString(10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating first snapshot");
createFullSnapshot(repositoryName, firstSnapshot);

int numberOfFiles = numberOfFiles(repositoryPath);

logger.info("--> adding some more documents to test index");
for (int j = 0; j < 10; ++j) {
final BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < 100; ++i) {
bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i));
}
client().bulk(bulkRequest).get();
}
refresh();

logger.info("--> creating second snapshot");
createFullSnapshot(repositoryName, secondSnapshot);

// Delete second snapshot
logger.info("--> delete second snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, secondSnapshot).get();

logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
assertFileCount(repositoryPath, numberOfFiles);

logger.info("--> done");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Setting.Property.NodeScope
);

/**
* Setting to set batch size of stale blobs to be deleted.
*/
public static final Setting<Integer> MAX_SHARD_BLOB_DELETE_BATCH_SIZE = Setting.intSetting(
"max_shard_blob_delete_batch_size",
1000,
Setting.Property.NodeScope
);

/**
* Setting to disable writing the {@code index.latest} blob which enables the contents of this repository to be used with a
* url-repository.
Expand All @@ -243,6 +252,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

protected final boolean supportURLRepo;

private final int maxShardBlobDeleteBatch;

private final boolean compress;

private final boolean cacheRepositoryData;
Expand Down Expand Up @@ -358,6 +369,7 @@ protected BlobStoreRepository(
readOnly = metadata.settings().getAsBoolean("readonly", false);
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes());
maxShardBlobDeleteBatch = MAX_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings());
}

@Override
Expand Down Expand Up @@ -902,15 +914,57 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
listener.onResponse(null);
return;
}
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
try {
deleteFromContainer(blobContainer(), filesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("{} Failed to delete some blobs during snapshot delete", snapshotIds), e);
throw e;

final BlockingQueue<List<String>> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>();
final List<String> partition = new ArrayList<>();

try {
for (String key : filesToDelete) {
partition.add(key);
if (maxShardBlobDeleteBatch == partition.size()) {
staleFilesToDeleteInBatch.add(new ArrayList<>(partition));
partition.clear();
}
}
}));
if (partition.isEmpty() == false) {
staleFilesToDeleteInBatch.add(new ArrayList<>(partition));
}
final GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(
ActionListener.wrap(r -> { listener.onResponse(null); }, listener::onFailure),
staleFilesToDeleteInBatch.size()
);

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), staleFilesToDeleteInBatch.size());
for (int i = 0; i < workers; ++i) {
executeStaleShardDelete(staleFilesToDeleteInBatch, groupedListener);
}

} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
// bubbling up and breaking the snapshot functionality.
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale shard blobs", snapshotIds), e);
listener.onFailure(e);
}
}

private void executeStaleShardDelete(BlockingQueue<List<String>> staleFilesToDeleteInBatch, GroupedActionListener<Void> listener)
throws InterruptedException {
List<String> filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS);
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
try {
deleteFromContainer(blobContainer(), filesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("{} Failed to delete blobs during snapshot delete", metadata.name()), e);
throw e;
}
executeStaleShardDelete(staleFilesToDeleteInBatch, listener);
}));
}
}

// updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.
Expand Down