From a1e60d35c6cb7f83dd6c18795e3eb8a7bc8d6cb6 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 31 Aug 2022 13:52:59 -0700 Subject: [PATCH 01/10] Segment Replication - Fix NoSuchFileException errors caused when computing metadata snapshot on primary shards. This change fixes the errors that occur when computing metadata snapshots on primary shards from the latest in-memory SegmentInfos. The error occurs when a segments_N file that is referenced by the in-memory infos is deleted as part of a concurrent commit. The segments themselves are incref'd by IndexWriter.incRefDeleter but the commit file (Segments_N) is not. This change resolves this by ignoring the segments_N file when computing metadata for CopyState and only sending incref'd segment files to replicas. Signed-off-by: Marc Handalian --- CHANGELOG.md | 1 + .../replication/SegmentReplicationIT.java | 50 +++++++++++++++- .../org/opensearch/index/store/Store.java | 38 +++++++++--- .../replication/CheckpointInfoResponse.java | 26 +++----- .../SegmentReplicationSourceService.java | 7 +-- .../replication/SegmentReplicationTarget.java | 24 ++------ .../indices/replication/common/CopyState.java | 31 +++------- .../SegmentReplicationIndexShardTests.java | 7 +-- .../opensearch/index/store/StoreTests.java | 59 ++++++++++++++----- .../OngoingSegmentReplicationsTests.java | 28 +++++---- .../SegmentReplicationSourceHandlerTests.java | 8 ++- .../SegmentReplicationSourceServiceTests.java | 2 - .../SegmentReplicationTargetServiceTests.java | 2 +- .../SegmentReplicationTargetTests.java | 18 +++--- .../replication/common/CopyStateTests.java | 10 +--- .../index/shard/IndexShardTestCase.java | 8 +-- 16 files changed, 178 insertions(+), 141 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0930923805d96..251ca174a4e05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix flaky random test `NRTReplicationEngineTests.testUpdateSegments` ([#4352](https://github.com/opensearch-project/OpenSearch/pull/4352)) - [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386)) - [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363)) +- Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366)) ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 16e9d78b17826..9b2ab753832d3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -9,7 +9,6 @@ package org.opensearch.indices.replication; import com.carrotsearch.randomizedtesting.RandomizedTest; -import org.apache.lucene.index.SegmentInfos; import org.junit.BeforeClass; import org.opensearch.action.admin.indices.segments.IndexShardSegments; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; @@ -586,13 +585,56 @@ private void assertSegmentStats(int numberOfReplicas) throws IOException { ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); IndexShard indexShard = getIndexShard(replicaNode.getName()); - final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory()); // calls to readCommit will fail if a valid commit point and all its segments are not in the store. - SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName); + indexShard.store().readLastCommittedSegmentsInfo(); } } } + public void testDropPrimaryDuringReplication() throws Exception { + final Settings settings = Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); + final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY); + createIndex(INDEX_NAME, settings); + internalCluster().startDataOnlyNodes(6); + ensureGreen(INDEX_NAME); + + int initialDocCount = scaledRandomIntBetween(100, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + // don't wait for replication to complete, stop the primary immediately. + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + ensureYellow(INDEX_NAME); + + // start another replica. + internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + // index another doc and refresh - without this the new replica won't catch up. + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + + flushAndRefresh(INDEX_NAME); + waitForReplicaUpdate(); + assertSegmentStats(6); + } + } + /** * Waits until the replica is caught up to the latest primary segments gen. * @throws Exception if assertion fails @@ -611,10 +653,12 @@ private void waitForReplicaUpdate() throws Exception { final List replicaShardSegments = segmentListMap.get(false); // if we don't have any segments yet, proceed. final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); if (primaryShardSegments.getSegments().isEmpty() == false) { final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); for (ShardSegments shardSegments : replicaShardSegments) { + logger.debug("Replica {} Segments: {}", shardSegments.getShardRouting(), shardSegments.getSegments()); final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() .stream() .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 58598ab2d08f4..de96ad03e2423 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -105,6 +105,7 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -122,6 +123,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; +import static org.opensearch.index.store.Store.MetadataSnapshot.loadMetadata; /** * A Store provides plain access to files written by an opensearch index shard. Each shard @@ -334,6 +336,16 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio return new MetadataSnapshot(segmentInfos, directory, logger); } + /** + * Fetch a map of StoreFileMetadata for segments, ignoring commit points. + * @param segmentInfos {@link SegmentInfos} from which to compute metadata. + * @return {@link Map} map file name to {@link StoreFileMetadata}. + */ + public Map getSegmentMetadataMap(SegmentInfos segmentInfos) throws IOException { + assert indexSettings.isSegRepEnabled(); + return loadMetadata(segmentInfos, directory, logger, true).fileMetadata; + } + /** * Renames all the given files from the key of the map to the * value of the map. All successfully renamed files are removed from the map in-place. @@ -709,31 +721,34 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr } /** - * This method deletes every file in this store that is not contained in either the remote or local metadata snapshots. + * Segment Replication method - + * This method deletes every file in this store that is not referenced by the passed in SegmentInfos or + * part of the latest on-disk commit point. * This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file. * In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk. * @param reason the reason for this cleanup operation logged for each deleted file - * @param localSnapshot The local snapshot from in memory SegmentInfos. + * @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present. * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ - public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot localSnapshot) throws IOException { + public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException { + assert indexSettings.isSegRepEnabled(); // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(reason, localSnapshot, getMetadata(readLastCommittedSegmentsInfo())); + cleanupFiles(reason, getMetadata(readLastCommittedSegmentsInfo()), infos.files(true)); } finally { metadataLock.writeLock().unlock(); } } - private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable MetadataSnapshot additionalSnapshot) + private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable Collection additionalFiles) throws IOException { assert metadataLock.isWriteLockedByCurrentThread(); for (String existingFile : directory.listAll()) { if (Store.isAutogenerated(existingFile) || localSnapshot.contains(existingFile) - || (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) { + || (additionalFiles != null && additionalFiles.contains(existingFile))) { // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete // checksum) continue; @@ -1033,6 +1048,11 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg } static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException { + return loadMetadata(segmentInfos, directory, logger, false); + } + + static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger, boolean ignoreSegmentsFile) + throws IOException { long numDocs = Lucene.getNumDocs(segmentInfos); Map commitUserDataBuilder = new HashMap<>(); commitUserDataBuilder.putAll(segmentInfos.getUserData()); @@ -1067,8 +1087,10 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director if (maxVersion == null) { maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion; } - final String segmentsFile = segmentInfos.getSegmentsFileName(); - checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true); + if (ignoreSegmentsFile == false) { + final String segmentsFile = segmentInfos.getSegmentsFileName(); + checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true); + } return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs); } diff --git a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java index a73a3b54184da..d2baca49b3c27 100644 --- a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java +++ b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java @@ -10,13 +10,12 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.transport.TransportResponse; import java.io.IOException; -import java.util.Set; +import java.util.Map; /** * Response returned from a {@link SegmentReplicationSource} that includes the file metadata, and SegmentInfos @@ -28,52 +27,41 @@ public class CheckpointInfoResponse extends TransportResponse { private final ReplicationCheckpoint checkpoint; - private final Store.MetadataSnapshot snapshot; + private final Map snapshot; private final byte[] infosBytes; - // pendingDeleteFiles are segments that have been merged away in the latest in memory SegmentInfos - // but are still referenced by the latest commit point (Segments_N). - private final Set pendingDeleteFiles; public CheckpointInfoResponse( final ReplicationCheckpoint checkpoint, - final Store.MetadataSnapshot snapshot, - final byte[] infosBytes, - final Set additionalFiles + final Map snapshot, + final byte[] infosBytes ) { this.checkpoint = checkpoint; this.snapshot = snapshot; this.infosBytes = infosBytes; - this.pendingDeleteFiles = additionalFiles; } public CheckpointInfoResponse(StreamInput in) throws IOException { this.checkpoint = new ReplicationCheckpoint(in); - this.snapshot = new Store.MetadataSnapshot(in); + this.snapshot = in.readMap(StreamInput::readString, StoreFileMetadata::new); this.infosBytes = in.readByteArray(); - this.pendingDeleteFiles = in.readSet(StoreFileMetadata::new); } @Override public void writeTo(StreamOutput out) throws IOException { checkpoint.writeTo(out); - snapshot.writeTo(out); + out.writeMap(snapshot, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut)); out.writeByteArray(infosBytes); - out.writeCollection(pendingDeleteFiles); } public ReplicationCheckpoint getCheckpoint() { return checkpoint; } - public Store.MetadataSnapshot getSnapshot() { + public Map getSnapshot() { return snapshot; } public byte[] getInfosBytes() { return infosBytes; } - - public Set getPendingDeleteFiles() { - return pendingDeleteFiles; - } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index db3f87201b774..91b8243440ac5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -133,12 +133,7 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan ); final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); channel.sendResponse( - new CheckpointInfoResponse( - copyState.getCheckpoint(), - copyState.getMetadataSnapshot(), - copyState.getInfosBytes(), - copyState.getPendingDeleteFiles() - ) + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); timer.stop(); logger.trace( diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 7c28406036ddd..e1e5a6afa0002 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -37,12 +37,8 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; +import java.util.Collections; /** * Represents the target of a replication event. @@ -178,7 +174,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener filesToFetch = new ArrayList(diff.missing); - Set storeFiles = new HashSet<>(Arrays.asList(store.directory().listAll())); - final Set pendingDeleteFiles = checkpointInfo.getPendingDeleteFiles() - .stream() - .filter(f -> storeFiles.contains(f.name()) == false) - .collect(Collectors.toSet()); - - filesToFetch.addAll(pendingDeleteFiles); - logger.trace("Files to fetch {}", filesToFetch); - - for (StoreFileMetadata file : filesToFetch) { + for (StoreFileMetadata file : diff.missing) { state.getIndex().addFileDetail(file.name(), file.length(), false); } // always send a req even if not fetching files so the primary can clear the copyState for this shard. state.setStage(SegmentReplicationState.Stage.GET_FILES); cancellableThreads.checkForCancel(); - source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, store, getFilesListener); + source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), diff.missing, store, getFilesListener); } private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener listener) { @@ -231,7 +217,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, responseCheckpoint.getSegmentsGen() ); indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); - store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", store.getMetadata(infos)); + store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index c0e0b4dee2b3f..9ded0125b7c17 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -8,6 +8,8 @@ package org.opensearch.indices.replication.common; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.ByteBuffersDataOutput; @@ -15,14 +17,12 @@ import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.util.concurrent.AbstractRefCounted; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.HashSet; -import java.util.Set; +import java.util.Map; /** * An Opensearch-specific version of Lucene's CopyState class that @@ -37,11 +37,11 @@ public class CopyState extends AbstractRefCounted { private final ReplicationCheckpoint requestedReplicationCheckpoint; /** Actual ReplicationCheckpoint returned by the shard */ private final ReplicationCheckpoint replicationCheckpoint; - private final Store.MetadataSnapshot metadataSnapshot; - private final HashSet pendingDeleteFiles; + private final Map metadataMap; private final byte[] infosBytes; private GatedCloseable commitRef; private final IndexShard shard; + public static final Logger logger = LogManager.getLogger(CopyState.class); public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShard shard) throws IOException { super("CopyState-" + shard.shardId()); @@ -49,7 +49,7 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar this.shard = shard; this.segmentInfosRef = shard.getSegmentInfosSnapshot(); SegmentInfos segmentInfos = this.segmentInfosRef.get(); - this.metadataSnapshot = shard.store().getMetadata(segmentInfos); + this.metadataMap = shard.store().getSegmentMetadataMap(segmentInfos); this.replicationCheckpoint = new ReplicationCheckpoint( shard.shardId(), shard.getOperationPrimaryTerm(), @@ -57,18 +57,7 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar shard.getProcessedLocalCheckpoint(), segmentInfos.getVersion() ); - - // Send files that are merged away in the latest SegmentInfos but not in the latest on disk Segments_N. - // This ensures that the store on replicas is in sync with the store on primaries. this.commitRef = shard.acquireLastIndexCommit(false); - Store.MetadataSnapshot metadata = shard.store().getMetadata(this.commitRef.get()); - final Store.RecoveryDiff diff = metadata.recoveryDiff(this.metadataSnapshot); - this.pendingDeleteFiles = new HashSet<>(diff.missing); - if (this.pendingDeleteFiles.isEmpty()) { - // If there are no additional files we can release the last commit immediately. - this.commitRef.close(); - this.commitRef = null; - } ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); // resource description and name are not used, but resource description cannot be null @@ -95,18 +84,14 @@ public ReplicationCheckpoint getCheckpoint() { return replicationCheckpoint; } - public Store.MetadataSnapshot getMetadataSnapshot() { - return metadataSnapshot; + public Map getMetadataMap() { + return metadataMap; } public byte[] getInfosBytes() { return infosBytes; } - public Set getPendingDeleteFiles() { - return pendingDeleteFiles; - } - public IndexShard getShard() { return shard; } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 88a3bdad53d0c..3af882a8087ec 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -489,12 +489,7 @@ private void resolveCheckpointInfoResponseListener(ActionListener metadataSnapshot = store.getSegmentMetadataMap(store.readLastCommittedSegmentsInfo()); + // no docs indexed only _N file exists. + assertTrue(metadataSnapshot.isEmpty()); + + // commit some docs to create segments. + commitRandomDocs(store); + + final Map snapshotAfterCommit = store.getSegmentMetadataMap(store.readLastCommittedSegmentsInfo()); + assertFalse(snapshotAfterCommit.isEmpty()); + assertFalse(snapshotAfterCommit.keySet().stream().anyMatch((name) -> name.startsWith(IndexFileNames.SEGMENTS))); + store.close(); + } + + private void commitRandomDocs(Store store) throws IOException { IndexWriterConfig indexWriterConfig = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec( TestUtil.getDefaultCodec() ); @@ -1173,19 +1214,5 @@ public void testcleanupAndPreserveLatestCommitPoint() throws IOException { writer.addDocument(doc); writer.commit(); writer.close(); - - Store.MetadataSnapshot commitMetadata = store.getMetadata(); - - Store.MetadataSnapshot refreshMetadata = Store.MetadataSnapshot.EMPTY; - - store.cleanupAndPreserveLatestCommitPoint("test", refreshMetadata); - - // we want to ensure commitMetadata files are preserved after calling cleanup - for (String existingFile : store.directory().listAll()) { - assert (commitMetadata.contains(existingFile) == true); - } - - deleteContent(store.directory()); - IOUtils.close(store); } } diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index f49ee0471b5e8..bd3106454f49b 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -11,28 +11,30 @@ import org.junit.Assert; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.IndexService; +import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.shard.ShardId; -import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -55,15 +57,18 @@ public class OngoingSegmentReplicationsTests extends IndexShardTestCase { private GetSegmentFilesRequest getSegmentFilesRequest; - final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final Settings settings = Settings.builder() + .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); @Override public void setUp() throws Exception { super.setUp(); - primary = newStartedShard(true); - replica = newShard(primary.shardId(), false); + primary = newStartedShard(true, settings); + replica = newShard(false, settings, new NRTReplicationEngineFactory()); recoverReplica(replica, primary, true); replicaDiscoveryNode = replica.recoveryState().getTargetNode(); primaryDiscoveryNode = replica.recoveryState().getSourceNode(); @@ -93,6 +98,8 @@ public void tearDown() throws Exception { } public void testPrepareAndSendSegments() throws IOException { + indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + primary.refresh("Test"); OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); final CheckpointInfoRequest request = new CheckpointInfoRequest( 1L, @@ -112,17 +119,14 @@ public void testPrepareAndSendSegments() throws IOException { 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - new ArrayList<>(copyState.getMetadataSnapshot().asMap().values()), + new ArrayList<>(copyState.getMetadataMap().values()), testCheckpoint ); - final Collection expectedFiles = List.copyOf(primary.store().getMetadata().asMap().values()); replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { - assertEquals(1, getSegmentFilesResponse.files.size()); - assertEquals(1, expectedFiles.size()); - assertTrue(expectedFiles.stream().findFirst().get().isSame(getSegmentFilesResponse.files.get(0))); + assertEquals(copyState.getMetadataMap().size(), getSegmentFilesResponse.files.size()); assertEquals(0, copyState.refCount()); assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); assertEquals(0, replications.size()); @@ -181,7 +185,7 @@ public void testCancelReplication_AfterSendFilesStarts() throws IOException, Int 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - new ArrayList<>(copyState.getMetadataSnapshot().asMap().values()), + new ArrayList<>(copyState.getMetadataMap().values()), testCheckpoint ); replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index 5f6ec7e505805..cde5cd980a91d 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -19,6 +19,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.StoreFileMetadata; @@ -76,7 +77,7 @@ public void testSendFiles() throws IOException { 1 ); - final List expectedFiles = List.copyOf(copyState.getMetadataSnapshot().asMap().values()); + final List expectedFiles = List.copyOf(copyState.getMetadataMap().values()); final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, @@ -137,6 +138,9 @@ public void onFailure(Exception e) { } public void testSendFileFails() throws IOException { + // index some docs on the primary so a segment is created. + indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + primary.refresh("Test"); chunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> listener.onFailure( new OpenSearchException("Test") ); @@ -153,7 +157,7 @@ public void testSendFileFails() throws IOException { 1 ); - final List expectedFiles = List.copyOf(copyState.getMetadataSnapshot().asMap().values()); + final List expectedFiles = List.copyOf(copyState.getMetadataMap().values()); final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java index 4bfdd81d50a1e..dcb6b6dad32dd 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java @@ -121,8 +121,6 @@ public void testCheckpointInfo() { public void onResponse(CheckpointInfoResponse response) { assertEquals(testCheckpoint, response.getCheckpoint()); assertNotNull(response.getInfosBytes()); - // CopyStateTests sets up one pending delete file and one committed segments file - assertEquals(1, response.getPendingDeleteFiles().size()); assertEquals(1, response.getSnapshot().size()); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 1d253b0a9a300..a674ab6151f79 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -59,7 +59,7 @@ public void setUp() throws Exception { .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) .build(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - primaryShard = newStartedShard(true); + primaryShard = newStartedShard(true, settings); replicaShard = newShard(false, settings, new NRTReplicationEngineFactory()); recoverReplica(replicaShard, primaryShard, true); checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, 0L); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 11217a46b3c69..93b0f46115fed 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -51,7 +51,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Random; import java.util.Arrays; @@ -135,7 +134,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); } @Override @@ -146,9 +145,8 @@ public void getSegmentFiles( Store store, ActionListener listener ) { - assertEquals(filesToFetch.size(), 2); + assertEquals(1, filesToFetch.size()); assert (filesToFetch.contains(SEGMENTS_FILE)); - assert (filesToFetch.contains(PENDING_DELETE_FILE)); listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } }; @@ -230,7 +228,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); } @Override @@ -273,7 +271,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); } @Override @@ -318,7 +316,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); } @Override @@ -362,7 +360,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); } @Override @@ -413,9 +411,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse( - new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1), buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE)) - ); + listener.onResponse(new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1).asMap(), buffer.toArrayCopy())); } @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index a6f0cf7e98411..77a4a6d22039e 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -22,7 +22,6 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.io.IOException; -import java.util.Set; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -32,6 +31,7 @@ public class CopyStateTests extends IndexShardTestCase { private static final long EXPECTED_LONG_VALUE = 1L; private static final ShardId TEST_SHARD_ID = new ShardId("testIndex", "testUUID", 0); private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST); + private static final StoreFileMetadata SEGMENT_FILE = new StoreFileMetadata("_0.si", 1L, "0", Version.LATEST); private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST); private static final Store.MetadataSnapshot COMMIT_SNAPSHOT = new Store.MetadataSnapshot( @@ -41,7 +41,7 @@ public class CopyStateTests extends IndexShardTestCase { ); private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot( - Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE), + Map.of(SEGMENT_FILE.name(), SEGMENT_FILE), null, 0 ); @@ -61,10 +61,6 @@ public void testCopyStateCreation() throws IOException { // version was never set so this should be zero assertEquals(0, checkpoint.getSegmentInfosVersion()); assertEquals(EXPECTED_LONG_VALUE, checkpoint.getPrimaryTerm()); - - Set pendingDeleteFiles = copyState.getPendingDeleteFiles(); - assertEquals(1, pendingDeleteFiles.size()); - assertTrue(pendingDeleteFiles.contains(PENDING_DELETE_FILE)); } public static IndexShard createMockIndexShard() throws IOException { @@ -78,7 +74,7 @@ public static IndexShard createMockIndexShard() throws IOException { SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); when(mockShard.getSegmentInfosSnapshot()).thenReturn(new GatedCloseable<>(testSegmentInfos, () -> {})); - when(mockStore.getMetadata(testSegmentInfos)).thenReturn(SI_SNAPSHOT); + when(mockStore.getSegmentMetadataMap(testSegmentInfos)).thenReturn(SI_SNAPSHOT.asMap()); IndexCommit mockIndexCommit = mock(IndexCommit.class); when(mockShard.acquireLastIndexCommit(false)).thenReturn(new GatedCloseable<>(mockIndexCommit, () -> {})); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 1b40cb4f2dfa3..3302ffd810bd4 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -134,6 +134,7 @@ import java.io.IOException; import java.util.ArrayList; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -1200,12 +1201,7 @@ public void getCheckpointMetadata( try { final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard); listener.onResponse( - new CheckpointInfoResponse( - copyState.getCheckpoint(), - copyState.getMetadataSnapshot(), - copyState.getInfosBytes(), - copyState.getPendingDeleteFiles() - ) + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e); From f424567ffb749e32cf78281a390f26ecbd7bf7ea Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 1 Sep 2022 00:19:09 -0700 Subject: [PATCH 02/10] Fix spotless. Signed-off-by: Marc Handalian --- .../opensearch/index/store/StoreTests.java | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index 220032f090ee7..075209173c206 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -124,10 +124,10 @@ public class StoreTests extends OpenSearchTestCase { Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build() ); - IndexSettings SEGMENT_REPLICATION_INDEX_SETTINGS = new IndexSettings(INDEX_SETTINGS.getIndexMetadata(), Settings.builder() - .put(INDEX_SETTINGS.getSettings()) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .build()); + IndexSettings SEGMENT_REPLICATION_INDEX_SETTINGS = new IndexSettings( + INDEX_SETTINGS.getIndexMetadata(), + Settings.builder().put(INDEX_SETTINGS.getSettings()).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build() + ); private static final Version MIN_SUPPORTED_LUCENE_VERSION = org.opensearch.Version.CURRENT .minimumIndexCompatibilityVersion().luceneVersion; @@ -1160,7 +1160,12 @@ public void testGetMetadataWithSegmentInfos() throws IOException { public void testCleanupAndPreserveLatestCommitPoint() throws IOException { final ShardId shardId = new ShardId("index", "_na_", 1); - Store store = new Store(shardId, SEGMENT_REPLICATION_INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId)); + Store store = new Store( + shardId, + SEGMENT_REPLICATION_INDEX_SETTINGS, + StoreTests.newDirectory(random()), + new DummyShardLock(shardId) + ); commitRandomDocs(store); Store.MetadataSnapshot commitMetadata = store.getMetadata(); @@ -1178,7 +1183,12 @@ public void testCleanupAndPreserveLatestCommitPoint() throws IOException { public void testGetSegmentMetadataMap() throws IOException { final ShardId shardId = new ShardId("index", "_na_", 1); - Store store = new Store(shardId, SEGMENT_REPLICATION_INDEX_SETTINGS, new NIOFSDirectory(createTempDir()), new DummyShardLock(shardId)); + Store store = new Store( + shardId, + SEGMENT_REPLICATION_INDEX_SETTINGS, + new NIOFSDirectory(createTempDir()), + new DummyShardLock(shardId) + ); store.createEmpty(Version.LATEST); final Map metadataSnapshot = store.getSegmentMetadataMap(store.readLastCommittedSegmentsInfo()); // no docs indexed only _N file exists. From f1ae2ddb5bb5c9319f8f10b9483e619633246492 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Sun, 4 Sep 2022 13:50:42 -0700 Subject: [PATCH 03/10] Update StoreTests.testCleanupAndPreserveLatestCommitPoint to assert additional segments are deleted. Signed-off-by: Marc Handalian --- .../org/opensearch/index/store/Store.java | 2 +- .../opensearch/index/store/StoreTests.java | 31 ++++++++++++++++--- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index de96ad03e2423..f0c1e92d90d64 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -337,7 +337,7 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio } /** - * Fetch a map of StoreFileMetadata for segments, ignoring commit points. + * Segment Replication method - Fetch a map of StoreFileMetadata for segments, ignoring Segment_N files. * @param segmentInfos {@link SegmentInfos} from which to compute metadata. * @return {@link Map} map file name to {@link StoreFileMetadata}. */ diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index 075209173c206..507712312e8e8 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -50,6 +50,7 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.index.Term; import org.apache.lucene.store.ByteBuffersDirectory; import org.apache.lucene.store.ChecksumIndexInput; @@ -1170,13 +1171,27 @@ public void testCleanupAndPreserveLatestCommitPoint() throws IOException { Store.MetadataSnapshot commitMetadata = store.getMetadata(); + // index more docs but only IW.flush, this will create additional files we'll clean up. + final IndexWriter writer = indexRandomDocs(store); + writer.flush(); + writer.close(); + + final List additionalSegments = new ArrayList<>(); + for (String file : store.directory().listAll()) { + if (commitMetadata.contains(file) == false) { + additionalSegments.add(file); + } + } + assertFalse(additionalSegments.isEmpty()); + + // clean up everything not in the latest commit point. store.cleanupAndPreserveLatestCommitPoint("test", store.readLastCommittedSegmentsInfo()); // we want to ensure commitMetadata files are preserved after calling cleanup for (String existingFile : store.directory().listAll()) { - assert (commitMetadata.contains(existingFile) == true); + assertTrue(commitMetadata.contains(existingFile)); + assertFalse(additionalSegments.contains(existingFile)); } - deleteContent(store.directory()); IOUtils.close(store); } @@ -1194,7 +1209,7 @@ public void testGetSegmentMetadataMap() throws IOException { // no docs indexed only _N file exists. assertTrue(metadataSnapshot.isEmpty()); - // commit some docs to create segments. + // commit some docs to create a commit point. commitRandomDocs(store); final Map snapshotAfterCommit = store.getSegmentMetadataMap(store.readLastCommittedSegmentsInfo()); @@ -1204,9 +1219,16 @@ public void testGetSegmentMetadataMap() throws IOException { } private void commitRandomDocs(Store store) throws IOException { + IndexWriter writer = indexRandomDocs(store); + writer.commit(); + writer.close(); + } + + private IndexWriter indexRandomDocs(Store store) throws IOException { IndexWriterConfig indexWriterConfig = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec( TestUtil.getDefaultCodec() ); + indexWriterConfig.setCommitOnClose(false); indexWriterConfig.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE); IndexWriter writer = new IndexWriter(store.directory(), indexWriterConfig); int docs = 1 + random().nextInt(100); @@ -1222,7 +1244,6 @@ private void commitRandomDocs(Store store) throws IOException { ); doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random())))); writer.addDocument(doc); - writer.commit(); - writer.close(); + return writer; } } From 6e253965fd9823e0e64d4bc742d75e87ea6c549e Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Sun, 4 Sep 2022 13:52:07 -0700 Subject: [PATCH 04/10] Rename snapshot to metadataMap in CheckpointInfoResponse. Signed-off-by: Marc Handalian --- .../replication/CheckpointInfoResponse.java | 14 +++---- .../SegmentReplicationSourceService.java | 40 ++++++++++++------- .../replication/SegmentReplicationTarget.java | 2 +- .../SegmentReplicationSourceServiceTests.java | 2 +- 4 files changed, 35 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java index d2baca49b3c27..48c2dfd30f589 100644 --- a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java +++ b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java @@ -27,29 +27,29 @@ public class CheckpointInfoResponse extends TransportResponse { private final ReplicationCheckpoint checkpoint; - private final Map snapshot; + private final Map metadataMap; private final byte[] infosBytes; public CheckpointInfoResponse( final ReplicationCheckpoint checkpoint, - final Map snapshot, + final Map metadataMap, final byte[] infosBytes ) { this.checkpoint = checkpoint; - this.snapshot = snapshot; + this.metadataMap = metadataMap; this.infosBytes = infosBytes; } public CheckpointInfoResponse(StreamInput in) throws IOException { this.checkpoint = new ReplicationCheckpoint(in); - this.snapshot = in.readMap(StreamInput::readString, StoreFileMetadata::new); + this.metadataMap = in.readMap(StreamInput::readString, StoreFileMetadata::new); this.infosBytes = in.readByteArray(); } @Override public void writeTo(StreamOutput out) throws IOException { checkpoint.writeTo(out); - out.writeMap(snapshot, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut)); + out.writeMap(metadataMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut)); out.writeByteArray(infosBytes); } @@ -57,8 +57,8 @@ public ReplicationCheckpoint getCheckpoint() { return checkpoint; } - public Map getSnapshot() { - return snapshot; + public Map getMetadataMap() { + return metadataMap; } public byte[] getInfosBytes() { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 91b8243440ac5..1cf1fa90ea078 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -20,6 +20,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.component.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; @@ -131,20 +132,31 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan new AtomicLong(0), (throttleTime) -> {} ); - final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); - channel.sendResponse( - new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) - ); - timer.stop(); - logger.trace( - new ParameterizedMessage( - "[replication id {}] Source node sent checkpoint info [{}] to target node [{}], timing: {}", - request.getReplicationId(), - copyState.getCheckpoint(), - request.getTargetNode().getId(), - timer.time() - ) - ); + try { + final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); + channel.sendResponse( + new CheckpointInfoResponse( + copyState.getCheckpoint(), + copyState.getMetadataMap(), + copyState.getInfosBytes() + ) + ); + timer.stop(); + logger.trace( + new ParameterizedMessage( + "[replication id {}] Source node sent checkpoint info [{}] to target node [{}], timing: {}", + request.getReplicationId(), + copyState.getCheckpoint(), + request.getTargetNode().getId(), + timer.time() + ) + ); + } catch (IOException e) { + timer.stop(); + logger.error(new ParameterizedMessage( + "[replication id {}] Source node failed to compute checkpoint info for target node [{}], timing: {}", request.getReplicationId(), request.getTargetNode().getId(), timer.time()), e); + channel.sendResponse(new CancellableThreads.ExecutionCancelledException("Primary failed to compute CopyState. Replication can be retried")); + } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index e1e5a6afa0002..99545129da7d9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -174,7 +174,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener Date: Sun, 4 Sep 2022 14:25:44 -0700 Subject: [PATCH 05/10] Refactor segmentReplicationDiff method to compute off two maps instead of MetadataSnapshots. Signed-off-by: Marc Handalian --- .../org/opensearch/index/store/Store.java | 96 ++++++++----------- .../replication/SegmentReplicationTarget.java | 16 ++-- .../opensearch/index/store/StoreTests.java | 39 ++++++++ .../SegmentReplicationTargetTests.java | 4 +- 4 files changed, 89 insertions(+), 66 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index f0c1e92d90d64..aff25a84128a2 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -346,6 +346,42 @@ public Map getSegmentMetadataMap(SegmentInfos segment return loadMetadata(segmentInfos, directory, logger, true).fileMetadata; } + + /** + * Segment Replication method + * Returns a diff between the Maps of StoreFileMetadata that can be used for getting list of files to copy over to a replica for segment replication. The returned diff will hold a list of files that are: + *
    + *
  • identical: they exist in both maps and they can be considered the same ie. they don't need to be recovered
  • + *
  • different: they exist in both maps but their they are not identical
  • + *
  • missing: files that exist in the source but not in the target
  • + *
+ */ + public static RecoveryDiff segmentReplicationDiff(Map source, Map target) { + final List identical = new ArrayList<>(); + final List different = new ArrayList<>(); + final List missing = new ArrayList<>(); + for (StoreFileMetadata value : source.values()) { + if (value.name().startsWith(IndexFileNames.SEGMENTS)) { + continue; + } + if (target.containsKey(value.name()) == false) { + missing.add(value); + } else { + final StoreFileMetadata fileMetadata = target.get(value.name()); + if (fileMetadata.isSame(value)) { + identical.add(value); + } else { + different.add(value); + } + } + } + return new RecoveryDiff( + Collections.unmodifiableList(identical), + Collections.unmodifiableList(different), + Collections.unmodifiableList(missing) + ); + } + /** * Renames all the given files from the key of the map to the * value of the map. All successfully renamed files are removed from the map in-place. @@ -840,17 +876,9 @@ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, l userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); latestSegmentInfos.setUserData(userData, true); latestSegmentInfos.commit(directory()); - - // similar to TrimUnsafeCommits, create a commit with an appending IW, this will delete old commits and ensure all files - // associated with the SegmentInfos.commit are fsynced. - final List existingCommits = DirectoryReader.listCommits(directory); - assert existingCommits.isEmpty() == false : "Expected at least one commit but none found"; - final IndexCommit lastIndexCommit = existingCommits.get(existingCommits.size() - 1); - assert latestSegmentInfos.getSegmentsFileName().equals(lastIndexCommit.getSegmentsFileName()); - try (IndexWriter writer = newAppendingIndexWriter(directory, lastIndexCommit)) { - writer.setLiveCommitData(lastIndexCommit.getUserData().entrySet()); - writer.commit(); - } + directory.sync(latestSegmentInfos.files(true)); + directory.syncMetaData(); + cleanupAndPreserveLatestCommitPoint("After commit", latestSegmentInfos); } finally { metadataLock.writeLock().unlock(); } @@ -1170,7 +1198,6 @@ public Map asMap() { * Helper method used to group store files according to segment and commit. * * @see MetadataSnapshot#recoveryDiff(MetadataSnapshot) - * @see MetadataSnapshot#segmentReplicationDiff(MetadataSnapshot) */ private Iterable> getGroupedFilesIterable() { final Map> perSegment = new HashMap<>(); @@ -1263,51 +1290,6 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { return recoveryDiff; } - /** - * Segment Replication method - * Returns a diff between the two snapshots that can be used for getting list of files to copy over to a replica for segment replication. The given snapshot is treated as the - * target and this snapshot as the source. The returned diff will hold a list of files that are: - *
    - *
  • identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered
  • - *
  • different: they exist in both snapshots but their they are not identical
  • - *
  • missing: files that exist in the source but not in the target
  • - *
- */ - public RecoveryDiff segmentReplicationDiff(MetadataSnapshot recoveryTargetSnapshot) { - final List identical = new ArrayList<>(); - final List different = new ArrayList<>(); - final List missing = new ArrayList<>(); - final ArrayList identicalFiles = new ArrayList<>(); - for (List segmentFiles : getGroupedFilesIterable()) { - identicalFiles.clear(); - boolean consistent = true; - for (StoreFileMetadata meta : segmentFiles) { - StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name()); - if (storeFileMetadata == null) { - // Do not consider missing files as inconsistent in SegRep as replicas may lag while primary updates - // documents and generate new files specific to a segment - missing.add(meta); - } else if (storeFileMetadata.isSame(meta) == false) { - consistent = false; - different.add(meta); - } else { - identicalFiles.add(meta); - } - } - if (consistent) { - identical.addAll(identicalFiles); - } else { - different.addAll(identicalFiles); - } - } - RecoveryDiff recoveryDiff = new RecoveryDiff( - Collections.unmodifiableList(identical), - Collections.unmodifiableList(different), - Collections.unmodifiableList(missing) - ); - return recoveryDiff; - } - /** * Returns the number of files in this snapshot */ diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 99545129da7d9..a338d66a33da9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -23,6 +23,7 @@ import org.opensearch.action.StepListener; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.shard.IndexShard; @@ -39,6 +40,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; +import java.util.Map; /** * Represents the target of a replication event. @@ -174,10 +176,8 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener getMetadataMap() throws IOException { if (indexShard.getSegmentInfosSnapshot() == null) { - return Store.MetadataSnapshot.EMPTY; + return Collections.emptyMap(); + } + try (final GatedCloseable snapshot = indexShard.getSegmentInfosSnapshot()) { + return store.getSegmentMetadataMap(snapshot.get()); } - return store.getMetadata(indexShard.getSegmentInfosSnapshot().get()); } @Override diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index 507712312e8e8..c20f33a54898d 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -96,6 +96,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -104,6 +105,7 @@ import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; +import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableMap; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; @@ -1218,6 +1220,43 @@ public void testGetSegmentMetadataMap() throws IOException { store.close(); } + public void testSegmentReplicationDiff() { + final String segmentName = "_0.si"; + final StoreFileMetadata SEGMENT_FILE = new StoreFileMetadata(segmentName, 1L, "0", Version.LATEST); + // source has file target is missing. + Store.RecoveryDiff diff = Store.segmentReplicationDiff(Map.of(segmentName, SEGMENT_FILE), Collections.emptyMap()); + assertEquals(List.of(SEGMENT_FILE), diff.missing); + assertTrue(diff.different.isEmpty()); + assertTrue(diff.identical.isEmpty()); + + // target has file not on source. + diff = Store.segmentReplicationDiff(Collections.emptyMap(), Map.of(segmentName, SEGMENT_FILE)); + assertTrue(diff.missing.isEmpty()); + assertTrue(diff.different.isEmpty()); + assertTrue(diff.identical.isEmpty()); + + // source and target have identical file. + diff = Store.segmentReplicationDiff(Map.of(segmentName, SEGMENT_FILE), Map.of(segmentName, SEGMENT_FILE)); + assertTrue(diff.missing.isEmpty()); + assertTrue(diff.different.isEmpty()); + assertEquals(List.of(SEGMENT_FILE), diff.identical); + + // source has diff copy of same file as target. + StoreFileMetadata SOURCE_DIFF_FILE = new StoreFileMetadata(segmentName, 1L, "abc", Version.LATEST); + diff = Store.segmentReplicationDiff(Map.of(segmentName, SOURCE_DIFF_FILE), Map.of(segmentName, SEGMENT_FILE)); + assertTrue(diff.missing.isEmpty()); + assertEquals(List.of(SOURCE_DIFF_FILE), diff.different); + assertTrue(diff.identical.isEmpty()); + + // ignore _N files if included in source map. + final String segmentsFile = IndexFileNames.SEGMENTS.concat("_2"); + StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(segmentsFile, 1L, "abc", Version.LATEST); + diff = Store.segmentReplicationDiff(Map.of(segmentsFile, SEGMENTS_FILE), Collections.emptyMap()); + assertTrue(diff.missing.isEmpty()); + assertTrue(diff.different.isEmpty()); + assertTrue(diff.identical.isEmpty()); + } + private void commitRandomDocs(Store store) throws IOException { IndexWriter writer = indexRandomDocs(store); writer.commit(); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 93b0f46115fed..fff81ce028095 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -378,7 +378,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener.class ); segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); - when(segrepTarget.getMetadataSnapshot()).thenReturn(SI_SNAPSHOT_DIFFERENT); + when(segrepTarget.getMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT.asMap()); segrepTarget.startReplication(new ActionListener() { @Override public void onResponse(Void replicationResponse) { @@ -430,7 +430,7 @@ public void getSegmentFiles( ); segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); - when(segrepTarget.getMetadataSnapshot()).thenReturn(storeMetadataSnapshots.get(0)); + when(segrepTarget.getMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); segrepTarget.startReplication(new ActionListener() { @Override public void onResponse(Void replicationResponse) { From 7f6b9f82a4def22d3df63be431d8336c0ec1e15a Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Sun, 4 Sep 2022 14:33:57 -0700 Subject: [PATCH 06/10] Fix spotless. Signed-off-by: Marc Handalian --- .../org/opensearch/index/store/Store.java | 1 - .../SegmentReplicationSourceService.java | 21 ++++++++++++------- .../opensearch/index/store/StoreTests.java | 4 +--- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index aff25a84128a2..9122c950a6ab6 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -346,7 +346,6 @@ public Map getSegmentMetadataMap(SegmentInfos segment return loadMetadata(segmentInfos, directory, logger, true).fileMetadata; } - /** * Segment Replication method * Returns a diff between the Maps of StoreFileMetadata that can be used for getting list of files to copy over to a replica for segment replication. The returned diff will hold a list of files that are: diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 1cf1fa90ea078..e00cdfa9cc204 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -135,11 +135,7 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan try { final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); channel.sendResponse( - new CheckpointInfoResponse( - copyState.getCheckpoint(), - copyState.getMetadataMap(), - copyState.getInfosBytes() - ) + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); timer.stop(); logger.trace( @@ -153,9 +149,18 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan ); } catch (IOException e) { timer.stop(); - logger.error(new ParameterizedMessage( - "[replication id {}] Source node failed to compute checkpoint info for target node [{}], timing: {}", request.getReplicationId(), request.getTargetNode().getId(), timer.time()), e); - channel.sendResponse(new CancellableThreads.ExecutionCancelledException("Primary failed to compute CopyState. Replication can be retried")); + logger.error( + new ParameterizedMessage( + "[replication id {}] Source node failed to compute checkpoint info for target node [{}], timing: {}", + request.getReplicationId(), + request.getTargetNode().getId(), + timer.time() + ), + e + ); + channel.sendResponse( + new CancellableThreads.ExecutionCancelledException("Primary failed to compute CopyState. Replication can be retried") + ); } } } diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index c20f33a54898d..89b11d604d7a1 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -50,7 +50,6 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SnapshotDeletionPolicy; -import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.index.Term; import org.apache.lucene.store.ByteBuffersDirectory; import org.apache.lucene.store.ChecksumIndexInput; @@ -105,7 +104,6 @@ import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableMap; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; @@ -1180,7 +1178,7 @@ public void testCleanupAndPreserveLatestCommitPoint() throws IOException { final List additionalSegments = new ArrayList<>(); for (String file : store.directory().listAll()) { - if (commitMetadata.contains(file) == false) { + if (commitMetadata.contains(file) == false) { additionalSegments.add(file); } } From e7f9de5ece19023d210ee34d3354e999a2353b8c Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Sun, 4 Sep 2022 14:43:09 -0700 Subject: [PATCH 07/10] Revert catchall in SegmentReplicationSourceService. Signed-off-by: Marc Handalian --- .../SegmentReplicationSourceService.java | 45 ++++++------------- 1 file changed, 14 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index e00cdfa9cc204..91b8243440ac5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -20,7 +20,6 @@ import org.opensearch.common.Nullable; import org.opensearch.common.component.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; @@ -132,36 +131,20 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan new AtomicLong(0), (throttleTime) -> {} ); - try { - final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); - channel.sendResponse( - new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) - ); - timer.stop(); - logger.trace( - new ParameterizedMessage( - "[replication id {}] Source node sent checkpoint info [{}] to target node [{}], timing: {}", - request.getReplicationId(), - copyState.getCheckpoint(), - request.getTargetNode().getId(), - timer.time() - ) - ); - } catch (IOException e) { - timer.stop(); - logger.error( - new ParameterizedMessage( - "[replication id {}] Source node failed to compute checkpoint info for target node [{}], timing: {}", - request.getReplicationId(), - request.getTargetNode().getId(), - timer.time() - ), - e - ); - channel.sendResponse( - new CancellableThreads.ExecutionCancelledException("Primary failed to compute CopyState. Replication can be retried") - ); - } + final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); + channel.sendResponse( + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) + ); + timer.stop(); + logger.trace( + new ParameterizedMessage( + "[replication id {}] Source node sent checkpoint info [{}] to target node [{}], timing: {}", + request.getReplicationId(), + copyState.getCheckpoint(), + request.getTargetNode().getId(), + timer.time() + ) + ); } } From e60f8d407d66cc1362370812818dc284c5a878a6 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Sun, 4 Sep 2022 15:40:53 -0700 Subject: [PATCH 08/10] Revert log lvl change. Signed-off-by: Marc Handalian --- .../indices/replication/SegmentReplicationTarget.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index a338d66a33da9..122d53ab8cd32 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -177,7 +177,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener Date: Sun, 4 Sep 2022 17:20:24 -0700 Subject: [PATCH 09/10] Fix SegmentReplicationTargetTests Signed-off-by: Marc Handalian --- .../SegmentReplicationTargetTests.java | 38 ++++++------------- 1 file changed, 12 insertions(+), 26 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index fff81ce028095..f8341573770a6 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -18,7 +18,6 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.Term; import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; import org.apache.lucene.store.Directory; @@ -70,26 +69,13 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase { private ReplicationCheckpoint repCheckpoint; private ByteBuffersDataOutput buffer; - private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST); - private static final StoreFileMetadata SEGMENTS_FILE_DIFF = new StoreFileMetadata( - IndexFileNames.SEGMENTS, - 5L, - "different", - Version.LATEST - ); - private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST); + private static final String SEGMENT_NAME = "_0.si"; + private static final StoreFileMetadata SEGMENT_FILE = new StoreFileMetadata(SEGMENT_NAME, 1L, "0", Version.LATEST); + private static final StoreFileMetadata SEGMENT_FILE_DIFF = new StoreFileMetadata(SEGMENT_NAME, 5L, "different", Version.LATEST); - private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot( - Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE), - null, - 0 - ); + private static final Map SI_SNAPSHOT = Map.of(SEGMENT_FILE.name(), SEGMENT_FILE); - private static final Store.MetadataSnapshot SI_SNAPSHOT_DIFFERENT = new Store.MetadataSnapshot( - Map.of(SEGMENTS_FILE_DIFF.name(), SEGMENTS_FILE_DIFF), - null, - 0 - ); + private static final Map SI_SNAPSHOT_DIFFERENT = Map.of(SEGMENT_FILE_DIFF.name(), SEGMENT_FILE_DIFF); private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( "index", @@ -134,7 +120,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy())); } @Override @@ -146,7 +132,7 @@ public void getSegmentFiles( ActionListener listener ) { assertEquals(1, filesToFetch.size()); - assert (filesToFetch.contains(SEGMENTS_FILE)); + assert (filesToFetch.contains(SEGMENT_FILE)); listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } }; @@ -228,7 +214,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy())); } @Override @@ -271,7 +257,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy())); } @Override @@ -316,7 +302,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy())); } @Override @@ -360,7 +346,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy())); } @Override @@ -378,7 +364,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener.class ); segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); - when(segrepTarget.getMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT.asMap()); + when(segrepTarget.getMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT); segrepTarget.startReplication(new ActionListener() { @Override public void onResponse(Void replicationResponse) { From d07446e35bc1a1a41b6341f0d20bc2e0b8280cc5 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Sun, 4 Sep 2022 19:01:37 -0700 Subject: [PATCH 10/10] Cleanup unused logger. Signed-off-by: Marc Handalian --- .../org/opensearch/indices/replication/common/CopyState.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 9ded0125b7c17..1dd0886fd2f36 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -8,8 +8,6 @@ package org.opensearch.indices.replication.common; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.ByteBuffersDataOutput; @@ -41,7 +39,6 @@ public class CopyState extends AbstractRefCounted { private final byte[] infosBytes; private GatedCloseable commitRef; private final IndexShard shard; - public static final Logger logger = LogManager.getLogger(CopyState.class); public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShard shard) throws IOException { super("CopyState-" + shard.shardId());