diff --git a/server/src/internalClusterTest/java/org/opensearch/index/WaitUntilRefreshIT.java b/server/src/internalClusterTest/java/org/opensearch/index/WaitUntilRefreshIT.java index e38b128c04fde..a75057356fe8a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/WaitUntilRefreshIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/WaitUntilRefreshIT.java @@ -42,7 +42,10 @@ import org.opensearch.action.support.WriteRequest.RefreshPolicy; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.rest.RestStatus; import org.opensearch.script.MockScriptPlugin; @@ -74,7 +77,11 @@ public class WaitUntilRefreshIT extends OpenSearchIntegTestCase { @Override public Settings indexSettings() { // Use a shorter refresh interval to speed up the tests. We'll be waiting on this interval several times. - return Settings.builder().put(super.indexSettings()).put("index.refresh_interval", "40ms").build(); + final Settings.Builder builder = Settings.builder().put(super.indexSettings()).put("index.refresh_interval", "40ms"); + if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { + builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + } + return builder.build(); } @Before diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java new file mode 100644 index 0000000000000..617c090cd06aa --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -0,0 +1,422 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.apache.lucene.index.SegmentInfos; +import org.junit.BeforeClass; +import org.opensearch.action.admin.indices.segments.IndexShardSegments; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; +import org.opensearch.index.engine.Segment; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.BackgroundIndexer; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationIT extends OpenSearchIntegTestCase { + + private static final String INDEX_NAME = "test-idx-1"; + private static final int SHARD_COUNT = 1; + private static final int REPLICA_COUNT = 1; + + @BeforeClass + public static void assumeFeatureFlag() { + assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + flushAndRefresh(INDEX_NAME); + waitForReplicaUpdate(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); + } + } + + public void testMultipleShards() throws Exception { + Settings indexSettings = Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + createIndex(INDEX_NAME, indexSettings); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(1, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + flushAndRefresh(INDEX_NAME); + waitForReplicaUpdate(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); + } + } + + public void testReplicationAfterForceMerge() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + + flush(INDEX_NAME); + waitForReplicaUpdate(); + // wait a short amount of time to give replication a chance to complete. + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + // Index a second set of docs so we can merge into one segment. + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); + } + } + + public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); + ensureGreen(INDEX_NAME); + + // Index a doc to create the first set of segments. _s1.si + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + // Flush segments to disk and create a new commit point (Primary: segments_3, _s1.si) + flushAndRefresh(INDEX_NAME); + assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + // Index to create another segment + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", "bar").get(); + + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + refresh(INDEX_NAME); + + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); + + waitForReplicaUpdate(); + assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); + assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); + + final Index index = resolveIndex(INDEX_NAME); + IndexShard primaryShard = getIndexShard(index, primaryNode); + IndexShard replicaShard = getIndexShard(index, replicaNode); + assertEquals( + primaryShard.translogStats().estimatedNumberOfOperations(), + replicaShard.translogStats().estimatedNumberOfOperations() + ); + assertSegmentStats(REPLICA_COUNT); + } + + public void testDeleteOperations() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + final int initialDocCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + // wait a short amount of time to give replication a chance to complete. + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + waitForReplicaUpdate(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + + Set ids = indexer.getIds(); + String id = ids.toArray()[0].toString(); + client(nodeA).prepareDelete(INDEX_NAME, id).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + + refresh(INDEX_NAME); + waitForReplicaUpdate(); + assertBusy(() -> { + final long nodeA_Count = client(nodeA).prepareSearch(INDEX_NAME) + .setSize(0) + .setPreference("_only_local") + .get() + .getHits() + .getTotalHits().value; + assertEquals(expectedHitCount - 1, nodeA_Count); + final long nodeB_Count = client(nodeB).prepareSearch(INDEX_NAME) + .setSize(0) + .setPreference("_only_local") + .get() + .getHits() + .getTotalHits().value; + assertEquals(expectedHitCount - 1, nodeB_Count); + }, 5, TimeUnit.SECONDS); + } + } + + private void assertSegmentStats(int numberOfReplicas) throws IOException { + final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); + + List segmentsByIndex = getShardSegments(indicesSegmentResponse); + + // There will be an entry in the list for each index. + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + + // Separate Primary & replica shards ShardSegments. + final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegments = segmentListMap.get(false); + + assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); + + assertEquals( + "There should be a ShardSegment entry for each replica in the replicationGroup", + numberOfReplicas, + replicaShardSegments.size() + ); + + for (ShardSegments shardSegment : replicaShardSegments) { + final Map latestReplicaSegments = getLatestSegments(shardSegment); + for (Segment replicaSegment : latestReplicaSegments.values()) { + final Segment primarySegment = latestPrimarySegments.get(replicaSegment.getName()); + assertEquals(replicaSegment.getGeneration(), primarySegment.getGeneration()); + assertEquals(replicaSegment.getNumDocs(), primarySegment.getNumDocs()); + assertEquals(replicaSegment.getDeletedDocs(), primarySegment.getDeletedDocs()); + assertEquals(replicaSegment.getSize(), primarySegment.getSize()); + } + + // Fetch the IndexShard for this replica and try and build its SegmentInfos from the previous commit point. + // This ensures the previous commit point is not wiped. + final ShardRouting replicaShardRouting = shardSegment.getShardRouting(); + ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); + final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); + final Index index = resolveIndex(INDEX_NAME); + IndexShard indexShard = getIndexShard(index, replicaNode.getName()); + final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory()); + // calls to readCommit will fail if a valid commit point and all its segments are not in the store. + SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName); + } + } + } + + /** + * Waits until the replica is caught up to the latest primary segments gen. + * @throws Exception + */ + private void waitForReplicaUpdate() throws Exception { + // wait until the replica has the latest segment generation. + assertBusy(() -> { + final IndicesSegmentResponse indicesSegmentResponse = client().admin() + .indices() + .segments(new IndicesSegmentsRequest()) + .actionGet(); + List segmentsByIndex = getShardSegments(indicesSegmentResponse); + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegments = segmentListMap.get(false); + // if we don't have any segments yet, proceed. + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + if (primaryShardSegments.getSegments().isEmpty() == false) { + final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); + final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); + for (ShardSegments shardSegments : replicaShardSegments) { + final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() + .stream() + .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); + assertTrue(isReplicaCaughtUpToPrimary); + } + } + } + }); + } + + private IndexShard getIndexShard(Index index, String node) { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexServiceSafe(index); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return indexService.getShard(shardId.get()); + } + + private List getShardSegments(IndicesSegmentResponse indicesSegmentResponse) { + return indicesSegmentResponse.getIndices() + .values() + .stream() // get list of IndexSegments + .flatMap(is -> is.getShards().values().stream()) // Map to shard replication group + .map(IndexShardSegments::getShards) // get list of segments across replication group + .collect(Collectors.toList()); + } + + private Map getLatestSegments(ShardSegments segments) { + final Long latestPrimaryGen = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare).get(); + return segments.getSegments() + .stream() + .filter(s -> s.getGeneration() == latestPrimaryGen) + .collect(Collectors.toMap(Segment::getName, Function.identity())); + } + + private Map> segmentsByShardType(ShardSegments[] replicationGroupSegments) { + return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 414426921201f..800e6d87c4b81 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -71,6 +71,12 @@ public NRTReplicationEngine(EngineConfig engineConfig) { this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); this.readerManager = readerManager; this.readerManager.addListener(completionStatsCache); + for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) { + this.readerManager.addListener(listener); + } + for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) { + this.readerManager.addListener(listener); + } final Map userData = store.readLastCommittedSegmentsInfo().getUserData(); final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); translogManagerRef = new WriteOnlyTranslogManager( @@ -199,6 +205,18 @@ protected ReferenceManager getReferenceManager(Search return readerManager; } + /** + * Refreshing of this engine will only happen internally when a new set of segments is received. The engine will ignore external + * refresh attempts so we can return false here. Further Engine's existing implementation reads DirectoryReader.isCurrent after acquiring a searcher. + * With this Engine's NRTReplicationReaderManager, This will use StandardDirectoryReader's implementation which determines if the reader is current by + * comparing the on-disk SegmentInfos version against the one in the reader, which at refresh points will always return isCurrent false and then refreshNeeded true. + * Even if this method returns refresh as needed, we ignore it and only ever refresh with incoming SegmentInfos. + */ + @Override + public boolean refreshNeeded() { + return false; + } + @Override public boolean isTranslogSyncNeeded() { return translogManager.getTranslog().syncNeeded(); diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index ac6754bf6a74a..96d74bea85920 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException { @Override public void afterRefresh(boolean didRefresh) throws IOException { - if (didRefresh) { + if (didRefresh && shard.getReplicationTracker().isPrimaryMode()) { publisher.publish(shard); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 442cbeb913d93..f3ad41d56687b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1382,9 +1382,16 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } /** - * Returns the lastest Replication Checkpoint that shard received + * Returns the latest ReplicationCheckpoint that shard received. + * @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { + if (indexSettings.isSegRepEnabled() == false) { + return null; + } + if (getEngineOrNull() == null) { + return ReplicationCheckpoint.empty(shardId); + } try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { return Optional.ofNullable(snapshot.get()) .map( @@ -1396,15 +1403,7 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { segmentInfos.getVersion() ) ) - .orElse( - new ReplicationCheckpoint( - shardId, - getOperationPrimaryTerm(), - SequenceNumbers.NO_OPS_PERFORMED, - getProcessedLocalCheckpoint(), - SequenceNumbers.NO_OPS_PERFORMED - ) - ); + .orElse(ReplicationCheckpoint.empty(shardId)); } catch (IOException ex) { throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex); } @@ -1421,6 +1420,10 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state())); return false; } + if (getReplicationTracker().isPrimaryMode()) { + logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints."); + return false; + } ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); if (localCheckpoint.isAheadOf(requestCheckpoint)) { logger.trace( @@ -3774,6 +3777,10 @@ public boolean scheduledRefresh() { if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it && isSearchIdle() && indexSettings.isExplicitRefresh() == false + && indexSettings.isSegRepEnabled() == false + // Indices with segrep enabled will never wait on a refresh and ignore shard idle. Primary shards push out new segments only + // after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving + // a new set of segments. && active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive // lets skip this refresh since we are search idle and // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 2309004c0777d..163717ad94c2c 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -110,8 +110,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -1003,7 +1003,12 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director // version is written since 3.1+: we should have already hit IndexFormatTooOld. throw new IllegalArgumentException("expected valid version value: " + info.info.toString()); } - if (version.onOrAfter(maxVersion)) { + // With segment replication enabled, we compute metadata snapshots from the latest in memory infos. + // In this case we will have SegmentInfos objects fetched from the primary's reader + // where the minSegmentLuceneVersion can be null even though there are segments. + // This is because the SegmentInfos object is not read from a commit/IndexInput, which sets + // minSegmentLuceneVersion. + if (maxVersion == null || version.onOrAfter(maxVersion)) { maxVersion = version; } for (String file : info.files()) { @@ -1097,6 +1102,30 @@ public Map asMap() { private static final String LIV_FILE_EXTENSION = "liv"; // lucene 5 delete file private static final String SEGMENT_INFO_EXTENSION = "si"; + /** + * Helper method used to group store files according to segment and commit. + * + * @see MetadataSnapshot#recoveryDiff(MetadataSnapshot) + * @see MetadataSnapshot#segmentReplicationDiff(MetadataSnapshot) + */ + private Iterable> getGroupedFilesIterable() { + final Map> perSegment = new HashMap<>(); + final List perCommitStoreFiles = new ArrayList<>(); + for (StoreFileMetadata meta : this) { + final String segmentId = IndexFileNames.parseSegmentName(meta.name()); + final String extension = IndexFileNames.getExtension(meta.name()); + if (IndexFileNames.SEGMENTS.equals(segmentId) + || DEL_FILE_EXTENSION.equals(extension) + || LIV_FILE_EXTENSION.equals(extension)) { + // only treat del files as per-commit files fnm files are generational but only for upgradable DV + perCommitStoreFiles.add(meta); + } else { + perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta); + } + } + return Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles)); + } + /** * Returns a diff between the two snapshots that can be used for recovery. The given snapshot is treated as the * recovery target and this snapshot as the source. The returned diff will hold a list of files that are: @@ -1134,23 +1163,8 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { final List identical = new ArrayList<>(); final List different = new ArrayList<>(); final List missing = new ArrayList<>(); - final Map> perSegment = new HashMap<>(); - final List perCommitStoreFiles = new ArrayList<>(); - - for (StoreFileMetadata meta : this) { - final String segmentId = IndexFileNames.parseSegmentName(meta.name()); - final String extension = IndexFileNames.getExtension(meta.name()); - if (IndexFileNames.SEGMENTS.equals(segmentId) - || DEL_FILE_EXTENSION.equals(extension) - || LIV_FILE_EXTENSION.equals(extension)) { - // only treat del files as per-commit files fnm files are generational but only for upgradable DV - perCommitStoreFiles.add(meta); - } else { - perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta); - } - } final ArrayList identicalFiles = new ArrayList<>(); - for (List segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) { + for (List segmentFiles : getGroupedFilesIterable()) { identicalFiles.clear(); boolean consistent = true; for (StoreFileMetadata meta : segmentFiles) { @@ -1185,6 +1199,51 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { return recoveryDiff; } + /** + * Segment Replication method + * Returns a diff between the two snapshots that can be used for getting list of files to copy over to a replica for segment replication. The given snapshot is treated as the + * target and this snapshot as the source. The returned diff will hold a list of files that are: + *
    + *
  • identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered
  • + *
  • different: they exist in both snapshots but their they are not identical
  • + *
  • missing: files that exist in the source but not in the target
  • + *
+ */ + public RecoveryDiff segmentReplicationDiff(MetadataSnapshot recoveryTargetSnapshot) { + final List identical = new ArrayList<>(); + final List different = new ArrayList<>(); + final List missing = new ArrayList<>(); + final ArrayList identicalFiles = new ArrayList<>(); + for (List segmentFiles : getGroupedFilesIterable()) { + identicalFiles.clear(); + boolean consistent = true; + for (StoreFileMetadata meta : segmentFiles) { + StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name()); + if (storeFileMetadata == null) { + // Do not consider missing files as inconsistent in SegRep as replicas may lag while primary updates + // documents and generate new files specific to a segment + missing.add(meta); + } else if (storeFileMetadata.isSame(meta) == false) { + consistent = false; + different.add(meta); + } else { + identicalFiles.add(meta); + } + } + if (consistent) { + identical.addAll(identicalFiles); + } else { + different.addAll(identicalFiles); + } + } + RecoveryDiff recoveryDiff = new RecoveryDiff( + Collections.unmodifiableList(identical), + Collections.unmodifiableList(different), + Collections.unmodifiableList(missing) + ); + return recoveryDiff; + } + /** * Returns the number of files in this snapshot */ diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index a600581384f31..ed66fb448ba95 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -56,6 +56,7 @@ import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.env.ShardLockObtainFailedException; @@ -80,6 +81,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.RepositoriesService; @@ -90,6 +92,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -134,7 +137,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final FailedShardHandler failedShardHandler = new FailedShardHandler(); private final boolean sendRefreshMapping; - private final List buildInIndexListener; + private final List builtInIndexListener; private final PrimaryReplicaSyncer primaryReplicaSyncer; private final Consumer globalCheckpointSyncer; private final RetentionLeaseSyncer retentionLeaseSyncer; @@ -148,6 +151,7 @@ public IndicesClusterStateService( final ClusterService clusterService, final ThreadPool threadPool, final PeerRecoveryTargetService recoveryTargetService, + final SegmentReplicationTargetService segmentReplicationTargetService, final ShardStateAction shardStateAction, final NodeMappingRefreshAction nodeMappingRefreshAction, final RepositoriesService repositoriesService, @@ -165,6 +169,7 @@ public IndicesClusterStateService( clusterService, threadPool, checkpointPublisher, + segmentReplicationTargetService, recoveryTargetService, shardStateAction, nodeMappingRefreshAction, @@ -185,6 +190,7 @@ public IndicesClusterStateService( final ClusterService clusterService, final ThreadPool threadPool, final SegmentReplicationCheckpointPublisher checkpointPublisher, + final SegmentReplicationTargetService segmentReplicationTargetService, final PeerRecoveryTargetService recoveryTargetService, final ShardStateAction shardStateAction, final NodeMappingRefreshAction nodeMappingRefreshAction, @@ -198,7 +204,15 @@ public IndicesClusterStateService( ) { this.settings = settings; this.checkpointPublisher = checkpointPublisher; - this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService); + + final List indexEventListeners = new ArrayList<>( + Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService) + ); + // if segrep feature flag is not enabled, don't wire the target serivce as an IndexEventListener. + if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { + indexEventListeners.add(segmentReplicationTargetService); + } + this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners); this.indicesService = indicesService; this.clusterService = clusterService; this.threadPool = threadPool; @@ -514,7 +528,7 @@ private void createIndices(final ClusterState state) { AllocatedIndex indexService = null; try { - indexService = indicesService.createIndex(indexMetadata, buildInIndexListener, true); + indexService = indicesService.createIndex(indexMetadata, builtInIndexListener, true); if (indexService.updateMapping(null, indexMetadata) && sendRefreshMapping) { nodeMappingRefreshAction.nodeMappingRefresh( state.nodes().getClusterManagerNode(), diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 426409f7a5b65..652f3c9a55f53 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -62,10 +62,13 @@ import org.opensearch.indices.replication.common.ReplicationCollection; import java.io.IOException; +import java.nio.channels.FileChannel; import java.nio.file.Path; import java.util.List; import java.util.concurrent.CountDownLatch; +import static org.opensearch.index.translog.Translog.TRANSLOG_UUID_KEY; + /** * Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of * this class are created through {@link ReplicationCollection}. @@ -398,13 +401,29 @@ public void cleanFiles( store.incRef(); try { store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetadata); - final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), - globalCheckpoint, - shardId(), - indexShard.getPendingPrimaryTerm() - ); - store.associateIndexWithNewTranslog(translogUUID); + + // If Segment Replication is enabled, we need to reuse the primary's translog UUID already stored in the index. + // With Segrep, replicas should never create their own commit points. This ensures the index and xlog share the same + // UUID without the extra step to associate the index with a new xlog. + if (indexShard.indexSettings().isSegRepEnabled()) { + final String translogUUID = store.getMetadata().getCommitUserData().get(TRANSLOG_UUID_KEY); + Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), + shardId(), + globalCheckpoint, + indexShard.getPendingPrimaryTerm(), + translogUUID, + FileChannel::open + ); + } else { + final String translogUUID = Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), + globalCheckpoint, + shardId(), + indexShard.getPendingPrimaryTerm() + ); + store.associateIndexWithNewTranslog(translogUUID); + } if (indexShard.getRetentionLeases().leases().isEmpty()) { // if empty, may be a fresh IndexShard, so write an empty leases file to disk diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 6302d364fc6d1..dfebe5f7cabf2 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -24,7 +24,10 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Predicate; +import java.util.stream.Collectors; /** * Manages references to ongoing segrep events on a node. @@ -38,7 +41,7 @@ class OngoingSegmentReplications { private final RecoverySettings recoverySettings; private final IndicesService indicesService; private final Map copyStateMap; - private final Map nodesToHandlers; + private final Map allocationIdToHandlers; /** * Constructor. @@ -50,7 +53,7 @@ class OngoingSegmentReplications { this.indicesService = indicesService; this.recoverySettings = recoverySettings; this.copyStateMap = Collections.synchronizedMap(new HashMap<>()); - this.nodesToHandlers = ConcurrentCollections.newConcurrentMap(); + this.allocationIdToHandlers = ConcurrentCollections.newConcurrentMap(); } /** @@ -96,8 +99,7 @@ synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) thro * @param listener {@link ActionListener} that resolves when sending files is complete. */ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener listener) { - final DiscoveryNode node = request.getTargetNode(); - final SegmentReplicationSourceHandler handler = nodesToHandlers.get(node); + final SegmentReplicationSourceHandler handler = allocationIdToHandlers.get(request.getTargetAllocationId()); if (handler != null) { if (handler.isReplicating()) { throw new OpenSearchException( @@ -108,30 +110,21 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { - final SegmentReplicationSourceHandler sourceHandler = nodesToHandlers.remove(node); + final SegmentReplicationSourceHandler sourceHandler = allocationIdToHandlers.remove(request.getTargetAllocationId()); if (sourceHandler != null) { removeCopyState(sourceHandler.getCopyState()); } }); - handler.sendFiles(request, wrappedListener); + if (request.getFilesToFetch().isEmpty()) { + wrappedListener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + } else { + handler.sendFiles(request, wrappedListener); + } } else { listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); } } - /** - * Cancel any ongoing replications for a given {@link DiscoveryNode} - * - * @param node {@link DiscoveryNode} node for which to cancel replication events. - */ - void cancelReplication(DiscoveryNode node) { - final SegmentReplicationSourceHandler handler = nodesToHandlers.remove(node); - if (handler != null) { - handler.cancel("Cancel on node left"); - removeCopyState(handler.getCopyState()); - } - } - /** * Prepare for a Replication event. This method constructs a {@link CopyState} holding files to be sent off of the current * nodes's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its @@ -145,9 +138,9 @@ void cancelReplication(DiscoveryNode node) { */ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException { final CopyState copyState = getCachedCopyState(request.getCheckpoint()); - if (nodesToHandlers.putIfAbsent( - request.getTargetNode(), - createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter) + if (allocationIdToHandlers.putIfAbsent( + request.getTargetAllocationId(), + createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter) ) != null) { throw new OpenSearchException( "Shard copy {} on node {} already replicating", @@ -159,18 +152,23 @@ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter f } /** - * Cancel all Replication events for the given shard, intended to be called when the current primary is shutting down. + * Cancel all Replication events for the given shard, intended to be called when a primary is shutting down. * * @param shard {@link IndexShard} * @param reason {@link String} - Reason for the cancel */ synchronized void cancel(IndexShard shard, String reason) { - for (SegmentReplicationSourceHandler entry : nodesToHandlers.values()) { - if (entry.getCopyState().getShard().equals(shard)) { - entry.cancel(reason); - } - } - copyStateMap.clear(); + cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason); + } + + /** + * Cancel any ongoing replications for a given {@link DiscoveryNode} + * + * @param node {@link DiscoveryNode} node for which to cancel replication events. + */ + void cancelReplication(DiscoveryNode node) { + cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left"); + } /** @@ -182,19 +180,25 @@ boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { } int size() { - return nodesToHandlers.size(); + return allocationIdToHandlers.size(); } int cachedCopyStateSize() { return copyStateMap.size(); } - private SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) { + private SegmentReplicationSourceHandler createTargetHandler( + DiscoveryNode node, + CopyState copyState, + String allocationId, + FileChunkWriter fileChunkWriter + ) { return new SegmentReplicationSourceHandler( node, fileChunkWriter, copyState.getShard().getThreadPool(), copyState, + allocationId, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks() ); @@ -227,4 +231,23 @@ private synchronized void removeCopyState(CopyState copyState) { copyStateMap.remove(copyState.getRequestedReplicationCheckpoint()); } } + + /** + * Remove handlers from allocationIdToHandlers map based on a filter predicate. + * This will also decref the handler's CopyState reference. + */ + private void cancelHandlers(Predicate predicate, String reason) { + final List allocationIds = allocationIdToHandlers.values() + .stream() + .filter(predicate) + .map(SegmentReplicationSourceHandler::getAllocationId) + .collect(Collectors.toList()); + for (String allocationId : allocationIds) { + final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId); + if (handler != null) { + handler.cancel(reason); + removeCopyState(handler.getCopyState()); + } + } + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index afbb80d263805..1867fc59c5a56 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -23,9 +23,9 @@ */ public class SegmentReplicationSourceFactory { - private TransportService transportService; - private RecoverySettings recoverySettings; - private ClusterService clusterService; + private final TransportService transportService; + private final RecoverySettings recoverySettings; + private final ClusterService clusterService; public SegmentReplicationSourceFactory( TransportService transportService, @@ -39,7 +39,7 @@ public SegmentReplicationSourceFactory( public SegmentReplicationSource get(IndexShard shard) { return new PrimaryShardReplicationSource( - clusterService.localNode(), + shard.recoveryState().getTargetNode(), shard.routingEntry().allocationId().getId(), transportService, recoverySettings, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index fdabd48c62929..ce764900e433f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -54,6 +54,8 @@ class SegmentReplicationSourceHandler { private final List resources = new CopyOnWriteArrayList<>(); private final Logger logger; private final AtomicBoolean isReplicating = new AtomicBoolean(); + private final DiscoveryNode targetNode; + private final String allocationId; /** * Constructor. @@ -70,9 +72,11 @@ class SegmentReplicationSourceHandler { FileChunkWriter writer, ThreadPool threadPool, CopyState copyState, + String allocationId, int fileChunkSizeInBytes, int maxConcurrentFileChunks ) { + this.targetNode = targetNode; this.shard = copyState.getShard(); this.logger = Loggers.getLogger( SegmentReplicationSourceHandler.class, @@ -89,6 +93,7 @@ class SegmentReplicationSourceHandler { fileChunkSizeInBytes, maxConcurrentFileChunks ); + this.allocationId = allocationId; this.copyState = copyState; } @@ -118,7 +123,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene logger.debug( "delaying replication of {} as it is not listed as assigned to target node {}", shard.shardId(), - request.getTargetNode() + targetNode ); throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } @@ -167,4 +172,12 @@ CopyState getCopyState() { public boolean isReplicating() { return isReplicating.get(); } + + public DiscoveryNode getTargetNode() { + return targetNode; + } + + public String getAllocationId() { + return allocationId; + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index fb68e59f3b2ef..73d9a2f805d75 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -154,7 +154,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener listener) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 0c3350f224b11..f699f0edba842 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -31,6 +31,8 @@ import org.opensearch.transport.TransportRequestHandler; import org.opensearch.transport.TransportService; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; /** @@ -49,6 +51,29 @@ public class SegmentReplicationTargetService implements IndexEventListener { private final SegmentReplicationSourceFactory sourceFactory; + private final Map latestReceivedCheckpoint = new HashMap<>(); + + // Empty Implementation, only required while Segment Replication is under feature flag. + public static final SegmentReplicationTargetService NO_OP = new SegmentReplicationTargetService() { + @Override + public void beforeIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) { + // NoOp; + } + + @Override + public synchronized void onNewCheckpoint(ReplicationCheckpoint receivedCheckpoint, IndexShard replicaShard) { + // noOp; + } + }; + + // Used only for empty implementation. + private SegmentReplicationTargetService() { + threadPool = null; + recoverySettings = null; + onGoingReplications = null; + sourceFactory = null; + } + /** * The internal actions * @@ -91,6 +116,15 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh * @param replicaShard replica shard on which checkpoint is received */ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) { + + // Checks if received checkpoint is already present and ahead then it replaces old received checkpoint + if (latestReceivedCheckpoint.get(replicaShard.shardId()) != null) { + if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) { + latestReceivedCheckpoint.replace(replicaShard.shardId(), receivedCheckpoint); + } + } else { + latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint); + } if (onGoingReplications.isShardReplicating(replicaShard.shardId())) { logger.trace( () -> new ParameterizedMessage( @@ -100,10 +134,23 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe ); return; } + final Thread thread = Thread.currentThread(); if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { @Override - public void onReplicationDone(SegmentReplicationState state) {} + public void onReplicationDone(SegmentReplicationState state) { + // if we received a checkpoint during the copy event that is ahead of this + // try and process it. + if (latestReceivedCheckpoint.get(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) { + Runnable runnable = () -> onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard); + // Checks if we are using same thread and forks if necessary. + if (thread == Thread.currentThread()) { + threadPool.generic().execute(runnable); + } else { + runnable.run(); + } + } + } @Override public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index f84a65206190b..8afb5bd055636 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -12,6 +12,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; import java.io.IOException; @@ -30,6 +31,18 @@ public class ReplicationCheckpoint implements Writeable { private final long seqNo; private final long segmentInfosVersion; + public static ReplicationCheckpoint empty(ShardId shardId) { + return new ReplicationCheckpoint(shardId); + } + + private ReplicationCheckpoint(ShardId shardId) { + this.shardId = shardId; + primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM; + segmentsGen = SequenceNumbers.NO_OPS_PERFORMED; + seqNo = SequenceNumbers.NO_OPS_PERFORMED; + segmentInfosVersion = SequenceNumbers.NO_OPS_PERFORMED; + } + public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long seqNo, long segmentInfosVersion) { this.shardId = shardId; this.primaryTerm = primaryTerm; @@ -112,10 +125,13 @@ public int hashCode() { } /** - * Checks if other is aheadof current replication point by comparing segmentInfosVersion. Returns true for null + * Checks if current replication checkpoint is AheadOf `other` replication checkpoint point by first comparing + * primaryTerm followed by segmentInfosVersion. Returns true when `other` is null. */ public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { - return other == null || segmentInfosVersion > other.getSegmentInfosVersion() || primaryTerm > other.getPrimaryTerm(); + return other == null + || primaryTerm > other.getPrimaryTerm() + || (primaryTerm == other.getPrimaryTerm() && segmentInfosVersion > other.getSegmentInfosVersion()); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index 27e23ceafb15e..501ff46eeb2ff 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -49,7 +49,6 @@ public abstract class ReplicationTarget extends AbstractRefCounted { private final long id; protected final AtomicBoolean finished = new AtomicBoolean(); - private final ShardId shardId; protected final IndexShard indexShard; protected final Store store; protected final ReplicationListener listener; @@ -89,7 +88,6 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn this.stateIndex = stateIndex; this.indexShard = indexShard; this.store = indexShard.store(); - this.shardId = indexShard.shardId(); // make sure the store is not released until we are done. this.cancellableThreads = new CancellableThreads(); store.incRef(); @@ -131,7 +129,7 @@ public Store store() { } public ShardId shardId() { - return shardId; + return indexShard.shardId(); } /** diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 346bff9afe296..0ac8471be7087 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -961,6 +961,8 @@ protected Node( ); b.bind(SegmentReplicationSourceService.class) .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); + } else { + b.bind(SegmentReplicationTargetService.class).toInstance(SegmentReplicationTargetService.NO_OP); } } b.bind(HttpServerTransport.class).toInstance(httpServerTransport); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 49d0c089f072b..a3a49e9e30564 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -3453,7 +3453,7 @@ public void testCheckpointRefreshListenerWithNull() throws IOException { } /** - * creates a new initializing shard. The shard will will be put in its proper path under the + * creates a new initializing shard. The shard will be put in its proper path under the * current node id the shard is assigned to. * @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint */ diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java new file mode 100644 index 0000000000000..4f6d86c13e12c --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.DocIdSeqNoAndSource; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationType; + +import java.io.IOException; +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + +public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { + + private static final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + + /** + * Test that latestReplicationCheckpoint returns null only for docrep enabled indices + */ + public void testReplicationCheckpointNullForDocRep() throws IOException { + Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, "DOCUMENT").put(Settings.EMPTY).build(); + final IndexShard indexShard = newStartedShard(false, indexSettings); + assertNull(indexShard.getLatestReplicationCheckpoint()); + closeShards(indexShard); + } + + /** + * Test that latestReplicationCheckpoint returns ReplicationCheckpoint for segrep enabled indices + */ + public void testReplicationCheckpointNotNullForSegReb() throws IOException { + Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT").put(Settings.EMPTY).build(); + final IndexShard indexShard = newStartedShard(indexSettings); + final ReplicationCheckpoint replicationCheckpoint = indexShard.getLatestReplicationCheckpoint(); + assertNotNull(replicationCheckpoint); + closeShards(indexShard); + } + + public void testSegmentReplication_Index_Update_Delete() throws Exception { + String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; + try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primaryShard = shards.getPrimary(); + + final int numDocs = randomIntBetween(100, 200); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + + primaryShard.refresh("Test"); + replicateSegments(primaryShard, shards.getReplicas()); + + shards.assertAllEqual(numDocs); + + for (int i = 0; i < numDocs; i++) { + // randomly update docs. + if (randomBoolean()) { + shards.index( + new IndexRequest(index.getName()).id(String.valueOf(i)).source("{ \"foo\" : \"baz\" }", XContentType.JSON) + ); + } + } + + primaryShard.refresh("Test"); + replicateSegments(primaryShard, shards.getReplicas()); + shards.assertAllEqual(numDocs); + + final List docs = getDocIdAndSeqNos(primaryShard); + for (IndexShard shard : shards.getReplicas()) { + assertEquals(getDocIdAndSeqNos(shard), docs); + } + for (int i = 0; i < numDocs; i++) { + // randomly delete. + if (randomBoolean()) { + shards.delete(new DeleteRequest(index.getName()).id(String.valueOf(i))); + } + } + primaryShard.refresh("Test"); + replicateSegments(primaryShard, shards.getReplicas()); + final List docsAfterDelete = getDocIdAndSeqNos(primaryShard); + for (IndexShard shard : shards.getReplicas()) { + assertEquals(getDocIdAndSeqNos(shard), docsAfterDelete); + } + } + } + + public void testIgnoreShardIdle() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + final int numDocs = shards.indexDocs(randomInt(10)); + primary.refresh("test"); + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs); + + primary.scheduledRefresh(); + replica.scheduledRefresh(); + + primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + + // Update the search_idle setting, this will put both shards into search idle. + Settings updatedSettings = Settings.builder() + .put(settings) + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) + .build(); + primary.indexSettings().getScopedSettings().applySettings(updatedSettings); + replica.indexSettings().getScopedSettings().applySettings(updatedSettings); + + primary.scheduledRefresh(); + replica.scheduledRefresh(); + + // Shards without segrep will register a new RefreshListener on the engine and return true when registered, + // assert with segrep enabled that awaitShardSearchActive does not register a listener. + primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + } + } + + /** + * here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh. + */ + public void testPublishCheckpointOnPrimaryMode() throws IOException { + final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); + IndexShard shard = newStartedShard(true); + CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock); + refreshListener.afterRefresh(true); + + // verify checkpoint is published + verify(mock, times(1)).publish(any()); + closeShards(shard); + } + + /** + * here we are starting a new primary shard in PrimaryMode initially and starting relocation handoff. Later we complete relocation handoff then shard is no longer + * in PrimaryMode, and we test if the shard does not publish checkpoint after refresh. + */ + public void testPublishCheckpointAfterRelocationHandOff() throws IOException { + final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); + IndexShard shard = newStartedShard(true); + CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock); + String id = shard.routingEntry().allocationId().getId(); + + // Starting relocation handoff + shard.getReplicationTracker().startRelocationHandoff(id); + + // Completing relocation handoff + shard.getReplicationTracker().completeRelocationHandoff(); + refreshListener.afterRefresh(true); + + // verify checkpoint is not published + verify(mock, times(0)).publish(any()); + closeShards(shard); + } + +} diff --git a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 5bac40ab64d11..1f2360abde2ad 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -66,6 +66,7 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.PeerRecoveryTargetService; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.threadpool.TestThreadPool; @@ -570,6 +571,7 @@ private IndicesClusterStateService createIndicesClusterStateService( clusterService, threadPool, SegmentReplicationCheckpointPublisher.EMPTY, + SegmentReplicationTargetService.NO_OP, recoveryTargetService, shardStateAction, null, diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index 5224a54a35e96..3ea74dbf38919 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -71,6 +71,7 @@ import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; +import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; import java.util.HashMap; @@ -103,6 +104,17 @@ public void testTranslogHistoryTransferred() throws Exception { } } + public void testWithSegmentReplication_ReplicaUsesPrimaryTranslogUUID() throws Exception { + Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + try (ReplicationGroup shards = createGroup(2, settings)) { + shards.startAll(); + final String expectedUUID = getTranslog(shards.getPrimary()).getTranslogUUID(); + assertTrue( + shards.getReplicas().stream().allMatch(indexShard -> getTranslog(indexShard).getTranslogUUID().equals(expectedUUID)) + ); + } + } + public void testRetentionPolicyChangeDuringRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startPrimary(); diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 260f6a13b5010..38c55620e1223 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -37,6 +37,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; public class OngoingSegmentReplicationsTests extends IndexShardTestCase { @@ -126,7 +127,7 @@ public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { @Override public void onFailure(Exception e) { logger.error("Unexpected failure", e); - Assert.fail(); + Assert.fail("Unexpected failure from startSegmentCopy listener: " + e); } }); } @@ -154,6 +155,9 @@ public void testCancelReplication() throws IOException { } public void testMultipleReplicasUseSameCheckpoint() throws IOException { + IndexShard secondReplica = newShard(primary.shardId(), false); + recoverReplica(secondReplica, primary, true); + OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings); final CheckpointInfoRequest request = new CheckpointInfoRequest( 1L, @@ -171,7 +175,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( 1L, - replica.routingEntry().allocationId().getId(), + secondReplica.routingEntry().allocationId().getId(), replicaDiscoveryNode, testCheckpoint ); @@ -186,6 +190,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); assertEquals(0, replications.cachedCopyStateSize()); + closeShards(secondReplica); } public void testStartCopyWithoutPrepareStep() { @@ -228,4 +233,83 @@ public void testShardAlreadyReplicatingToNode() throws IOException { replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertThrows(OpenSearchException.class, () -> { replications.prepareForReplication(request, segmentSegmentFileChunkWriter); }); } + + public void testStartReplicationWithNoFilesToFetch() throws IOException { + // create a replications object and request a checkpoint. + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + // mock the FileChunkWriter so we can assert its ever called. + final FileChunkWriter segmentSegmentFileChunkWriter = mock(FileChunkWriter.class); + // Prepare for replication step - and ensure copyState is added to cache. + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); + assertEquals(1, replications.size()); + assertEquals(1, copyState.refCount()); + + getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + Collections.emptyList(), + testCheckpoint + ); + + // invoke startSegmentCopy and assert our fileChunkWriter is never invoked. + replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + assertEquals(Collections.emptyList(), getSegmentFilesResponse.files); + assertEquals(0, copyState.refCount()); + assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); + verifyNoInteractions(segmentSegmentFileChunkWriter); + } + + @Override + public void onFailure(Exception e) { + logger.error("Unexpected failure", e); + Assert.fail(); + } + }); + } + + public void testCancelAllReplicationsForShard() throws IOException { + // This tests when primary has multiple ongoing replications. + IndexShard replica_2 = newShard(primary.shardId(), false); + recoverReplica(replica_2, primary, true); + + OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + primaryDiscoveryNode, + testCheckpoint + ); + + final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class)); + assertEquals(1, copyState.refCount()); + + final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( + 1L, + replica_2.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class)); + + assertEquals(2, copyState.refCount()); + assertEquals(2, replications.size()); + assertEquals(1, replications.cachedCopyStateSize()); + + // cancel the primary's ongoing replications. + replications.cancel(primary, "Test"); + assertEquals(0, copyState.refCount()); + assertEquals(0, replications.size()); + assertEquals(0, replications.cachedCopyStateSize()); + closeShards(replica_2); + } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index 70061c54d0da2..2c52772649acc 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -15,7 +15,9 @@ import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionListener; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.Settings; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.StoreFileMetadata; @@ -41,7 +43,8 @@ public class SegmentReplicationSourceHandlerTests extends IndexShardTestCase { @Override public void setUp() throws Exception { super.setUp(); - primary = newStartedShard(true); + final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT").put(Settings.EMPTY).build(); + primary = newStartedShard(true, settings); replica = newShard(primary.shardId(), false); recoverReplica(replica, primary, true); replicaDiscoveryNode = replica.recoveryState().getTargetNode(); @@ -63,6 +66,7 @@ public void testSendFiles() throws IOException { chunkWriter, threadPool, copyState, + primary.routingEntry().allocationId().getId(), 5000, 1 ); @@ -100,6 +104,7 @@ public void testSendFiles_emptyRequest() throws IOException { chunkWriter, threadPool, copyState, + primary.routingEntry().allocationId().getId(), 5000, 1 ); @@ -138,6 +143,7 @@ public void testSendFileFails() throws IOException { chunkWriter, threadPool, copyState, + primary.routingEntry().allocationId().getId(), 5000, 1 ); @@ -175,6 +181,7 @@ public void testReplicationAlreadyRunning() throws IOException { chunkWriter, threadPool, copyState, + primary.routingEntry().allocationId().getId(), 5000, 1 ); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 1d089c78159a6..2916f4c8152a2 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -13,6 +13,7 @@ import org.mockito.Mockito; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.index.shard.IndexShard; @@ -34,6 +35,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.eq; public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { @@ -49,7 +51,10 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { @Override public void setUp() throws Exception { super.setUp(); - final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT") + .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) + .build(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); final TransportService transportService = mock(TransportService.class); @@ -203,6 +208,57 @@ public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOExc closeShard(indexShard, false); } + /** + * here we are starting a new shard in PrimaryMode and testing that we don't process a checkpoint on shard when it is in PrimaryMode. + */ + public void testRejectCheckpointOnShardPrimaryMode() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + + // Starting a new shard in PrimaryMode. + IndexShard primaryShard = newStartedShard(true); + IndexShard spyShard = spy(primaryShard); + doNothing().when(spy).startReplication(any(), any(), any()); + spy.onNewCheckpoint(aheadCheckpoint, spyShard); + + // Verify that checkpoint is not processed as shard is in PrimaryMode. + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(primaryShard); + } + + public void testReplicationOnDone() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(indexShard); + ReplicationCheckpoint cp = indexShard.getLatestReplicationCheckpoint(); + ReplicationCheckpoint newCheckpoint = new ReplicationCheckpoint( + cp.getShardId(), + cp.getPrimaryTerm(), + cp.getSegmentsGen(), + cp.getSeqNo(), + cp.getSegmentInfosVersion() + 1 + ); + ReplicationCheckpoint anotherNewCheckpoint = new ReplicationCheckpoint( + cp.getShardId(), + cp.getPrimaryTerm(), + cp.getSegmentsGen(), + cp.getSeqNo(), + cp.getSegmentInfosVersion() + 2 + ); + ArgumentCaptor captor = ArgumentCaptor.forClass( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + doNothing().when(spy).startReplication(any(), any(), any()); + spy.onNewCheckpoint(newCheckpoint, spyShard); + spy.onNewCheckpoint(anotherNewCheckpoint, spyShard); + verify(spy, times(1)).startReplication(eq(newCheckpoint), any(), captor.capture()); + verify(spy, times(1)).onNewCheckpoint(eq(anotherNewCheckpoint), any()); + SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue(); + listener.onDone(new SegmentReplicationState(new ReplicationLuceneIndex())); + doNothing().when(spy).onNewCheckpoint(any(), any()); + verify(spy, timeout(0).times(2)).onNewCheckpoint(eq(anotherNewCheckpoint), any()); + closeShard(indexShard, false); + + } + public void testBeforeIndexShardClosed_CancelsOngoingReplications() { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index a0944ee249859..1157c463785ac 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -8,29 +8,51 @@ package org.opensearch.indices.replication; -import org.apache.lucene.index.IndexFileNames; -import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.Term; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.analysis.MockAnalyzer; +import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.Version; import org.junit.Assert; import org.mockito.Mockito; +import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.index.store.StoreTests; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.DummyShardLock; +import org.opensearch.test.IndexSettingsModule; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Random; +import java.util.Arrays; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -69,6 +91,11 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase { 0 ); + private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( + "index", + Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build() + ); + SegmentInfos testSegmentInfos; @Override @@ -361,6 +388,113 @@ public void onFailure(Exception e) { }); } + /** + * This tests ensures that new files generated on primary (due to delete operation) are not considered missing on replica + * @throws IOException + */ + public void test_MissingFiles_NotCausingFailure() throws IOException { + int docCount = 1 + random().nextInt(10); + // Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files + // generated due to delete operations. These two snapshots can then be used in test to mock the primary shard + // snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element). + List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount); + + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse( + new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1), buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE)) + ); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + + segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); + when(segrepTarget.getMetadataSnapshot()).thenReturn(storeMetadataSnapshots.get(0)); + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + logger.info("No error processing checkpoint info"); + } + + @Override + public void onFailure(Exception e) { + assert (e instanceof IllegalStateException); + } + }); + } + + /** + * Generates a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete + * operation. A list of snapshots is returned so that identical files have same checksum. + * @param docCount + * @return + * @throws IOException + */ + List generateStoreMetadataSnapshot(int docCount) throws IOException { + List docList = new ArrayList<>(); + for (int i = 0; i < docCount; i++) { + Document document = new Document(); + String text = new String(new char[] { (char) (97 + i), (char) (97 + i) }); + document.add(new StringField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + document.add(new TextField("str", text, Field.Store.YES)); + docList.add(document); + } + long seed = random().nextLong(); + Random random = new Random(seed); + IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec()); + iwc.setMergePolicy(NoMergePolicy.INSTANCE); + iwc.setUseCompoundFile(true); + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId)); + IndexWriter writer = new IndexWriter(store.directory(), iwc); + for (Document d : docList) { + writer.addDocument(d); + } + writer.commit(); + Store.MetadataSnapshot storeMetadata = store.getMetadata(); + // Delete one document to generate .liv file + writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(docCount)))); + writer.commit(); + Store.MetadataSnapshot storeMetadataWithDeletes = store.getMetadata(); + deleteContent(store.directory()); + writer.close(); + store.close(); + return Arrays.asList(storeMetadata, storeMetadataWithDeletes); + } + + public static void deleteContent(Directory directory) throws IOException { + final String[] files = directory.listAll(); + final List exceptions = new ArrayList<>(); + for (String file : files) { + try { + directory.deleteFile(file); + } catch (NoSuchFileException | FileNotFoundException e) { + // ignore + } catch (IOException e) { + exceptions.add(e); + } + } + ExceptionsHelper.rethrowAndSuppress(exceptions); + } + @Override public void tearDown() throws Exception { super.tearDown(); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index a100e84dcf8ae..e9ef5ba30c865 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -183,6 +183,8 @@ import org.opensearch.indices.recovery.PeerRecoverySourceService; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.SegmentReplicationSourceFactory; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.ingest.IngestService; import org.opensearch.monitor.StatusInfo; @@ -1847,6 +1849,12 @@ public void onFailure(final Exception e) { clusterService, threadPool, new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService), + new SegmentReplicationTargetService( + threadPool, + recoverySettings, + transportService, + new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) + ), shardStateAction, new NodeMappingRefreshAction(transportService, metadataMappingService), repositoriesService, diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index c7ac9fe889a9f..57368bed9b7ac 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -140,6 +140,21 @@ protected ReplicationGroup createGroup(int replicas, Settings settings) throws I return new ReplicationGroup(metadata); } + protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFactory engineFactory) throws IOException { + return createGroup(replicas, settings, indexMapping, engineFactory); + } + + protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory) + throws IOException { + IndexMetadata metadata = buildIndexMetadata(replicas, settings, mappings); + return new ReplicationGroup(metadata) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return engineFactory; + } + }; + } + protected IndexMetadata buildIndexMetadata(int replicas) throws IOException { return buildIndexMetadata(replicas, indexMapping); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 371fa6d102304..cad6579ac941d 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -33,9 +33,15 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.junit.Assert; +import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; +import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; @@ -58,6 +64,7 @@ import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.internal.io.IOUtils; @@ -82,6 +89,7 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; @@ -94,7 +102,14 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.recovery.StartRecoveryRequest; +import org.opensearch.indices.replication.CheckpointInfoResponse; +import org.opensearch.indices.replication.GetSegmentFilesResponse; +import org.opensearch.indices.replication.SegmentReplicationSource; +import org.opensearch.indices.replication.SegmentReplicationTarget; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.IndexId; @@ -112,6 +127,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -122,6 +138,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Mockito.mock; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; /** @@ -1116,4 +1133,117 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { } }; } + + /** + * Segment Replication specific test method - Replicate segments to a list of replicas from a given primary. + * This test will use a real {@link SegmentReplicationTarget} for each replica with a mock {@link SegmentReplicationSource} that + * writes all segments directly to the target. + */ + public final void replicateSegments(IndexShard primaryShard, List replicaShards) throws IOException, InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size()); + Store.MetadataSnapshot primaryMetadata; + try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { + final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get(); + primaryMetadata = primaryShard.store().getMetadata(primarySegmentInfos); + } + final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard); + + final ReplicationCollection replicationCollection = new ReplicationCollection<>(logger, threadPool); + final SegmentReplicationSource source = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse( + new CheckpointInfoResponse( + copyState.getCheckpoint(), + copyState.getMetadataSnapshot(), + copyState.getInfosBytes(), + copyState.getPendingDeleteFiles() + ) + ); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + try ( + final ReplicationCollection.ReplicationRef replicationRef = replicationCollection.get( + replicationId + ) + ) { + writeFileChunks(replicationRef.get(), primaryShard, filesToFetch.toArray(new StoreFileMetadata[] {})); + } catch (IOException e) { + listener.onFailure(e); + } + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + + for (IndexShard replica : replicaShards) { + final SegmentReplicationTarget target = new SegmentReplicationTarget( + ReplicationCheckpoint.empty(replica.shardId), + replica, + source, + new ReplicationListener() { + @Override + public void onDone(ReplicationState state) { + try (final GatedCloseable snapshot = replica.getSegmentInfosSnapshot()) { + final SegmentInfos replicaInfos = snapshot.get(); + final Store.MetadataSnapshot replicaMetadata = replica.store().getMetadata(replicaInfos); + final Store.RecoveryDiff recoveryDiff = primaryMetadata.recoveryDiff(replicaMetadata); + assertTrue(recoveryDiff.missing.isEmpty()); + assertTrue(recoveryDiff.different.isEmpty()); + assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); + assertEquals(primaryMetadata.getCommitUserData(), replicaMetadata.getCommitUserData()); + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + countDownLatch.countDown(); + } + + @Override + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + logger.error("Unexpected replication failure in test", e); + Assert.fail("test replication should not fail: " + e); + } + } + ); + replicationCollection.start(target, TimeValue.timeValueMillis(5000)); + target.startReplication(new ActionListener<>() { + @Override + public void onResponse(Void o) { + replicationCollection.markAsDone(target.getId()); + } + + @Override + public void onFailure(Exception e) { + replicationCollection.fail(target.getId(), new OpenSearchException("Segment Replication failed", e), true); + } + }); + } + countDownLatch.await(3, TimeUnit.SECONDS); + } + + private void writeFileChunks(SegmentReplicationTarget target, IndexShard primary, StoreFileMetadata[] files) throws IOException { + for (StoreFileMetadata md : files) { + try (IndexInput in = primary.store().directory().openInput(md.name(), IOContext.READONCE)) { + int pos = 0; + while (pos < md.length()) { + int length = between(1, Math.toIntExact(md.length() - pos)); + byte[] buffer = new byte[length]; + in.readBytes(buffer, 0, length); + target.writeFileChunk(md, pos, new BytesArray(buffer), pos + length == md.length(), 0, mock(ActionListener.class)); + pos += length; + } + } + } + } }