diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index af175be286b13..c94313d2b8cee 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -193,6 +193,18 @@ protected ReferenceManager getReferenceManager(Search return readerManager; } + /** + * Refreshing of this engine will only happen internally when a new set of segments is received. The engine will ignore external + * refresh attempts so we can return false here. Further Engine's existing implementation reads DirectoryReader.isCurrent after acquiring a searcher. + * With this Engine's NRTReplicationReaderManager, This will use StandardDirectoryReader's implementation which determines if the reader is current by + * comparing the on-disk SegmentInfos version against the one in the reader, which at refresh points will always return isCurrent false and then refreshNeeded true. + * Even if this method returns refresh as needed, we ignore it and only ever refresh with incoming SegmentInfos. + */ + @Override + public boolean refreshNeeded() { + return false; + } + @Override public Closeable acquireHistoryRetentionLock() { throw new UnsupportedOperationException("Not implemented"); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 9694f3dd37f80..b05eec4304cd5 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3778,6 +3778,10 @@ public boolean scheduledRefresh() { if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it && isSearchIdle() && indexSettings.isExplicitRefresh() == false + && indexSettings.isSegRepEnabled() == false + // Indices with segrep enabled will never wait on a refresh and ignore shard idle. Primary shards push out new segments only + // after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving + // a new set of segments. && active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive // lets skip this refresh since we are search idle and // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java new file mode 100644 index 0000000000000..3fcf6116b11a2 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -0,0 +1,59 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.indices.replication.common.ReplicationType; + +public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { + + private static final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + + public void testIgnoreShardIdle() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + final int numDocs = shards.indexDocs(randomInt(10)); + primary.refresh("test"); + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs); + + primary.scheduledRefresh(); + replica.scheduledRefresh(); + + primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + + // Update the search_idle setting, this will put both shards into search idle. + Settings updatedSettings = Settings.builder() + .put(settings) + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) + .build(); + primary.indexSettings().getScopedSettings().applySettings(updatedSettings); + replica.indexSettings().getScopedSettings().applySettings(updatedSettings); + + primary.scheduledRefresh(); + replica.scheduledRefresh(); + + // Shards without segrep will register a new RefreshListener on the engine and return true when registered, + // assert with segrep enabled that awaitShardSearchActive does not register a listener. + primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + } + } +} diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 5e22a7c145a39..a588909085160 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -139,6 +139,16 @@ protected ReplicationGroup createGroup(int replicas, Settings settings) throws I return new ReplicationGroup(metadata); } + protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFactory engineFactory) throws IOException { + IndexMetadata metadata = buildIndexMetadata(replicas, settings, indexMapping); + return new ReplicationGroup(metadata) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return engineFactory; + } + }; + } + protected IndexMetadata buildIndexMetadata(int replicas) throws IOException { return buildIndexMetadata(replicas, indexMapping); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index c5ee54450cce2..7dedc572ff19b 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -33,9 +33,15 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.junit.Assert; +import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; +import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; @@ -58,6 +64,7 @@ import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.internal.io.IOUtils; @@ -82,6 +89,7 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; @@ -94,7 +102,14 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.recovery.StartRecoveryRequest; +import org.opensearch.indices.replication.CheckpointInfoResponse; +import org.opensearch.indices.replication.GetSegmentFilesResponse; +import org.opensearch.indices.replication.SegmentReplicationSource; +import org.opensearch.indices.replication.SegmentReplicationTarget; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.IndexId; @@ -112,6 +127,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -122,6 +138,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Mockito.mock; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; /** @@ -1133,4 +1150,117 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { } }; } + + /** + * Segment Replication specific test method - Replicate segments to a list of replicas from a given primary. + * This test will use a real {@link SegmentReplicationTarget} for each replica with a mock {@link SegmentReplicationSource} that + * writes all segments directly to the target. + */ + public final void replicateSegments(IndexShard primaryShard, List replicaShards) throws IOException, InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size()); + Store.MetadataSnapshot primaryMetadata; + try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { + final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get(); + primaryMetadata = primaryShard.store().getMetadata(primarySegmentInfos); + } + final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard); + + final ReplicationCollection replicationCollection = new ReplicationCollection<>(logger, threadPool); + final SegmentReplicationSource source = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse( + new CheckpointInfoResponse( + copyState.getCheckpoint(), + copyState.getMetadataSnapshot(), + copyState.getInfosBytes(), + copyState.getPendingDeleteFiles() + ) + ); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + try ( + final ReplicationCollection.ReplicationRef replicationRef = replicationCollection.get( + replicationId + ) + ) { + writeFileChunks(replicationRef.get(), primaryShard, filesToFetch.toArray(new StoreFileMetadata[] {})); + } catch (IOException e) { + listener.onFailure(e); + } + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + + for (IndexShard replica : replicaShards) { + final SegmentReplicationTarget target = new SegmentReplicationTarget( + ReplicationCheckpoint.empty(replica.shardId), + replica, + source, + new ReplicationListener() { + @Override + public void onDone(ReplicationState state) { + try (final GatedCloseable snapshot = replica.getSegmentInfosSnapshot()) { + final SegmentInfos replicaInfos = snapshot.get(); + final Store.MetadataSnapshot replicaMetadata = replica.store().getMetadata(replicaInfos); + final Store.RecoveryDiff recoveryDiff = primaryMetadata.recoveryDiff(replicaMetadata); + assertTrue(recoveryDiff.missing.isEmpty()); + assertTrue(recoveryDiff.different.isEmpty()); + assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); + assertEquals(primaryMetadata.getCommitUserData(), replicaMetadata.getCommitUserData()); + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + countDownLatch.countDown(); + } + + @Override + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + logger.error("Unexpected replication failure in test", e); + Assert.fail("test replication should not fail: " + e); + } + } + ); + replicationCollection.start(target, TimeValue.timeValueMillis(5000)); + target.startReplication(new ActionListener<>() { + @Override + public void onResponse(Void o) { + replicationCollection.markAsDone(target.getId()); + } + + @Override + public void onFailure(Exception e) { + replicationCollection.fail(target.getId(), new OpenSearchException("Segment Replication failed", e), true); + } + }); + } + countDownLatch.await(3, TimeUnit.SECONDS); + } + + private void writeFileChunks(SegmentReplicationTarget target, IndexShard primary, StoreFileMetadata[] files) throws IOException { + for (StoreFileMetadata md : files) { + try (IndexInput in = primary.store().directory().openInput(md.name(), IOContext.READONCE)) { + int pos = 0; + while (pos < md.length()) { + int length = between(1, Math.toIntExact(md.length() - pos)); + byte[] buffer = new byte[length]; + in.readBytes(buffer, 0, length); + target.writeFileChunk(md, pos, new BytesArray(buffer), pos + length == md.length(), 0, mock(ActionListener.class)); + pos += length; + } + } + } + } }