From 0274c0ec5b500318687f35d4452629b269aaa967 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 19 Jul 2023 17:19:18 +0530 Subject: [PATCH] [Remote Segment Store] Use files from segmentInfosSnapshot in upload (#8487) (#8772) * Use files from segmentInfosSnapshot in upload --------- Signed-off-by: Sachin Kale Co-authored-by: Sachin Kale --- .../RemoteStoreBaseIntegTestCase.java | 9 ++ .../remotestore/RemoteStoreForceMergeIT.java | 139 ++++++++++++++++++ .../remotestore/RemoteStoreStatsIT.java | 12 +- .../shard/RemoteStoreRefreshListener.java | 93 +++++------- 4 files changed, 185 insertions(+), 68 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreForceMergeIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 10f01749ab4c5..3c463026281eb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -9,7 +9,9 @@ package org.opensearch.remotestore; import org.junit.After; +import org.opensearch.action.index.IndexResponse; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; @@ -49,6 +51,13 @@ public Settings indexSettings() { return defaultIndexSettings(); } + IndexResponse indexSingleDoc(String indexName) { + return client().prepareIndex(indexName) + .setId(UUIDs.randomBase64UUID()) + .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) + .get(); + } + private Settings defaultIndexSettings() { return Settings.builder() .put(super.indexSettings()) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreForceMergeIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreForceMergeIT.java new file mode 100644 index 0000000000000..1e9458069d459 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreForceMergeIT.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.junit.Before; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 3) +public class RemoteStoreForceMergeIT extends RemoteStoreBaseIntegTestCase { + + private static final String INDEX_NAME = "remote-store-test-idx-1"; + private static final String TOTAL_OPERATIONS = "total-operations"; + private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total"; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class); + } + + @Before + public void setup() { + setupRepo(); + } + + @Override + public Settings indexSettings() { + return remoteStoreIndexSettings(0); + } + + private Map indexData(int numberOfIterations, boolean invokeFlush, boolean flushAfterMerge, long deletedDocs) { + long totalOperations = 0; + long maxSeqNo = -1; + List indexResponseList = new ArrayList<>(); + for (int i = 0; i < numberOfIterations; i++) { + int numberOfOperations = randomIntBetween(20, 50); + for (int j = 0; j < numberOfOperations; j++) { + IndexResponse response = indexSingleDoc(INDEX_NAME); + maxSeqNo = response.getSeqNo(); + indexResponseList.add(response); + } + totalOperations += numberOfOperations; + if (invokeFlush) { + flush(INDEX_NAME); + } else { + refresh(INDEX_NAME); + } + } + if (deletedDocs == -1) { + deletedDocs = totalOperations; + } + int length = indexResponseList.size(); + for (int j = 0; j < deletedDocs; j++) { + maxSeqNo = client().prepareDelete().setIndex(INDEX_NAME).setId(indexResponseList.get(length - j - 1).getId()).get().getSeqNo(); + } + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(flushAfterMerge).get(); + refresh(INDEX_NAME); + assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), totalOperations - deletedDocs); + Map indexingStats = new HashMap<>(); + indexingStats.put(TOTAL_OPERATIONS, totalOperations); + indexingStats.put(MAX_SEQ_NO_TOTAL, maxSeqNo); + return indexingStats; + } + + private void verifyRestoredData(Map indexStats, long deletedDocs) { + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) - deletedDocs); + IndexResponse response = indexSingleDoc(INDEX_NAME); + assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo()); + refresh(INDEX_NAME); + assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1 - deletedDocs); + } + + private void testRestoreWithMergeFlow(int numberOfIterations, boolean invokeFlush, boolean flushAfterMerge, long deletedDocs) + throws IOException { + createIndex(INDEX_NAME, remoteTranslogIndexSettings(0)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + Map indexStats = indexData(numberOfIterations, invokeFlush, flushAfterMerge, deletedDocs); + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + assertAcked(client().admin().indices().prepareClose(INDEX_NAME)); + + client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture()); + ensureGreen(INDEX_NAME); + + if (deletedDocs == -1) { + verifyRestoredData(indexStats, indexStats.get(TOTAL_OPERATIONS)); + } else { + verifyRestoredData(indexStats, deletedDocs); + } + } + + // Following integ tests use randomBoolean to control the number of integ tests. If we use the separate + // values for each of the flags, number of integ tests become 16 in comparison to current 2. + // We have run all the 16 tests on local and they run fine. + public void testRestoreForceMergeSingleIteration() throws IOException { + boolean invokeFLush = randomBoolean(); + boolean flushAfterMerge = randomBoolean(); + testRestoreWithMergeFlow(1, invokeFLush, flushAfterMerge, randomIntBetween(0, 10)); + } + + public void testRestoreForceMergeMultipleIterations() throws IOException { + boolean invokeFLush = randomBoolean(); + boolean flushAfterMerge = randomBoolean(); + testRestoreWithMergeFlow(randomIntBetween(2, 5), invokeFLush, flushAfterMerge, randomIntBetween(0, 10)); + } + + public void testRestoreForceMergeMultipleIterationsDeleteAll() throws IOException { + boolean invokeFLush = randomBoolean(); + boolean flushAfterMerge = randomBoolean(); + testRestoreWithMergeFlow(randomIntBetween(2, 3), invokeFLush, flushAfterMerge, -1); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java index be495014e3155..38bab7c7e17c0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -12,9 +12,7 @@ import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequestBuilder; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; -import org.opensearch.action.index.IndexResponse; import org.opensearch.cluster.ClusterState; -import org.opensearch.common.UUIDs; import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.test.OpenSearchIntegTestCase; @@ -135,7 +133,7 @@ private void indexDocs() { } int numberOfOperations = randomIntBetween(20, 50); for (int j = 0; j < numberOfOperations; j++) { - indexSingleDoc(); + indexSingleDoc(INDEX_NAME); } } } @@ -156,12 +154,4 @@ private void assertResponseStats(RemoteRefreshSegmentTracker.Stats stats) { assertTrue(stats.uploadBytesPerSecMovingAverage > 0); assertTrue(stats.uploadTimeMovingAverage > 0); } - - private IndexResponse indexSingleDoc() { - return client().prepareIndex(INDEX_NAME) - .setId(UUIDs.randomBase64UUID()) - .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) - .get(); - } - } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 912ba3828dbef..4a402ef397759 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; @@ -40,13 +39,10 @@ import java.io.IOException; import java.util.Collection; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -216,67 +212,50 @@ private synchronized boolean syncSegments() { long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); Collection localSegmentsPostRefresh = segmentInfos.files(true); - List segmentInfosFiles = localSegmentsPostRefresh.stream() - .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) - .collect(Collectors.toList()); - Optional latestSegmentInfos = segmentInfosFiles.stream() - .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); - - if (latestSegmentInfos.isPresent()) { - // SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain - // all the segments from last commit if they are merged away but not yet committed. - // Each metadata file in the remote segment store represents a commit and the following - // statement keeps sure that each metadata will always contain all the segments from last commit + refreshed - // segments. - localSegmentsPostRefresh.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)); - segmentInfosFiles.stream() - .filter(file -> !file.equals(latestSegmentInfos.get())) - .forEach(localSegmentsPostRefresh::remove); - - // Create a map of file name to size and update the refresh segment tracker - updateLocalSizeMapAndTracker(localSegmentsPostRefresh); - CountDownLatch latch = new CountDownLatch(1); - ActionListener segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() { - @Override - public void onResponse(Void unused) { - try { - // Start metadata file upload - uploadMetadata(localSegmentsPostRefresh, segmentInfos); - clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); - onSuccessfulSegmentsSync( - refreshTimeMs, - refreshClockTimeMs, - refreshSeqNo, - lastRefreshedCheckpoint, - checkpoint - ); - // At this point since we have uploaded new segments, segment infos and segment metadata file, - // along with marking minSeqNoToKeep, upload has succeeded completely. - successful.set(true); - } catch (Exception e) { - // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried - // in the next refresh. This should not affect durability of the indexed data after remote trans-log - // integration. - logger.warn("Exception in post new segment upload actions", e); - } + // Create a map of file name to size and update the refresh segment tracker + updateLocalSizeMapAndTracker(localSegmentsPostRefresh); + CountDownLatch latch = new CountDownLatch(1); + ActionListener segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(Void unused) { + try { + // Start metadata file upload + uploadMetadata(localSegmentsPostRefresh, segmentInfos); + clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); + onSuccessfulSegmentsSync( + refreshTimeMs, + refreshClockTimeMs, + refreshSeqNo, + lastRefreshedCheckpoint, + checkpoint + ); + // At this point since we have uploaded new segments, segment infos and segment metadata file, + // along with marking minSeqNoToKeep, upload has succeeded completely. + successful.set(true); + } catch (Exception e) { + // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried + // as part of exponential back-off retry logic. This should not affect durability of the indexed data + // with remote trans-log integration. + logger.warn("Exception in post new segment upload actions", e); } + } - @Override - public void onFailure(Exception e) { - logger.warn("Exception while uploading new segments to the remote segment store", e); - } - }, latch); + @Override + public void onFailure(Exception e) { + logger.warn("Exception while uploading new segments to the remote segment store", e); + } + }, latch); - // Start the segments files upload - uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener); - latch.await(); - } + // Start the segments files upload + uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener); + latch.await(); } catch (EngineException e) { logger.warn("Exception while reading SegmentInfosSnapshot", e); } } catch (IOException e) { // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried - // in the next refresh. This should not affect durability of the indexed data after remote trans-log integration. + // as part of exponential back-off retry logic. This should not affect durability of the indexed data + // with remote trans-log integration. logger.warn("Exception while uploading new segments to the remote segment store", e); } } catch (Throwable t) {