diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java index e72110f4c4efd..d946a9510cc16 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java @@ -35,6 +35,8 @@ import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.Metadata; @@ -46,6 +48,7 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryException; import org.opensearch.repositories.RepositoryVerificationException; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.snapshots.mockstore.MockRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.threadpool.ThreadPool; @@ -328,4 +331,55 @@ public void testRepositoryVerification() throws Exception { assertThat(ex.getMessage(), containsString("is not shared")); } } + + public void testSnapshotShardBlobDelete() throws Exception { + Client client = client(); + Path repositoryPath = randomRepoPath(); + final String repositoryName = "test-repo"; + final String firstSnapshot = "first-snapshot"; + final String secondSnapshot = "second-snapshot"; + final String indexName = "test-idx"; + + logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath()); + createRepository( + "test-repo", + "mock", + Settings.builder().put("location", repositoryPath).put(BlobStoreRepository.MAX_SHARD_BLOB_DELETE_BATCH_SIZE.getKey(), 10) + ); + + logger.info("--> creating index-0 and ingest data"); + createIndex(indexName); + ensureGreen(); + for (int j = 0; j < 10; j++) { + index(indexName, "_doc", Integer.toString(10 + j), "foo", "bar" + 10 + j); + } + refresh(); + + logger.info("--> creating first snapshot"); + createFullSnapshot(repositoryName, firstSnapshot); + + int numberOfFiles = numberOfFiles(repositoryPath); + + logger.info("--> adding some more documents to test index"); + for (int j = 0; j < 10; ++j) { + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < 100; ++i) { + bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i)); + } + client().bulk(bulkRequest).get(); + } + refresh(); + + logger.info("--> creating second snapshot"); + createFullSnapshot(repositoryName, secondSnapshot); + + // Delete second snapshot + logger.info("--> delete second snapshot"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, secondSnapshot).get(); + + logger.info("--> make sure that number of files is back to what it was when the first snapshot was made"); + assertFileCount(repositoryPath, numberOfFiles); + + logger.info("--> done"); + } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 6eaec491c8177..52b51629158f0 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -235,6 +235,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.NodeScope ); + /** + * Setting to set batch size of stale blobs to be deleted. + */ + public static final Setting MAX_SHARD_BLOB_DELETE_BATCH_SIZE = Setting.intSetting( + "max_shard_blob_delete_batch_size", + 1000, + Setting.Property.NodeScope + ); + /** * Setting to disable writing the {@code index.latest} blob which enables the contents of this repository to be used with a * url-repository. @@ -243,6 +252,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp protected final boolean supportURLRepo; + private final int maxShardBlobDeleteBatch; + private final boolean compress; private final boolean cacheRepositoryData; @@ -358,6 +369,7 @@ protected BlobStoreRepository( readOnly = metadata.settings().getAsBoolean("readonly", false); cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes()); + maxShardBlobDeleteBatch = MAX_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings()); } @Override @@ -902,15 +914,57 @@ private void asyncCleanupUnlinkedShardLevelBlobs( listener.onResponse(null); return; } - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { - try { - deleteFromContainer(blobContainer(), filesToDelete); - l.onResponse(null); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("{} Failed to delete some blobs during snapshot delete", snapshotIds), e); - throw e; + + final BlockingQueue> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(); + final List partition = new ArrayList<>(); + + try { + for (String key : filesToDelete) { + partition.add(key); + if (maxShardBlobDeleteBatch == partition.size()) { + staleFilesToDeleteInBatch.add(new ArrayList<>(partition)); + partition.clear(); + } } - })); + if (partition.isEmpty() == false) { + staleFilesToDeleteInBatch.add(new ArrayList<>(partition)); + } + final GroupedActionListener groupedListener = new GroupedActionListener<>( + ActionListener.wrap(r -> { listener.onResponse(null); }, listener::onFailure), + staleFilesToDeleteInBatch.size() + ); + + // Start as many workers as fit into the snapshot pool at once at the most + final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), staleFilesToDeleteInBatch.size()); + for (int i = 0; i < workers; ++i) { + executeStaleShardDelete(staleFilesToDeleteInBatch, groupedListener); + } + + } catch (Exception e) { + // TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream. + // Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations + // bubbling up and breaking the snapshot functionality. + assert false : e; + logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale shard blobs", snapshotIds), e); + listener.onFailure(e); + } + } + + private void executeStaleShardDelete(BlockingQueue> staleFilesToDeleteInBatch, GroupedActionListener listener) + throws InterruptedException { + List filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS); + if (filesToDelete != null) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { + try { + deleteFromContainer(blobContainer(), filesToDelete); + l.onResponse(null); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("{} Failed to delete blobs during snapshot delete", metadata.name()), e); + throw e; + } + executeStaleShardDelete(staleFilesToDeleteInBatch, listener); + })); + } } // updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.