From e6acd60f89b50764dc9c620b2ca154ff22663163 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 4 Aug 2022 09:23:44 -0700 Subject: [PATCH] Disable shard idle with segment replication. (#4118) This change disables shard idle when segment replication is enabled. Primary shards will only push out new segments on refresh, we do not want to block this based on search behavior. Replicas will only refresh on an externally provided SegmentInfos, so we do not want search requests to hang waiting for a refresh. Signed-off-by: Marc Handalian --- .../index/engine/NRTReplicationEngine.java | 12 ++ .../opensearch/index/shard/IndexShard.java | 4 + .../SegmentReplicationIndexShardTests.java | 59 ++++++++ ...enSearchIndexLevelReplicationTestCase.java | 10 ++ .../index/shard/IndexShardTestCase.java | 130 ++++++++++++++++++ 5 files changed, 215 insertions(+) create mode 100644 server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 414426921201f..0f8749248a724 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -199,6 +199,18 @@ protected ReferenceManager getReferenceManager(Search return readerManager; } + /** + * Refreshing of this engine will only happen internally when a new set of segments is received. The engine will ignore external + * refresh attempts so we can return false here. Further Engine's existing implementation reads DirectoryReader.isCurrent after acquiring a searcher. + * With this Engine's NRTReplicationReaderManager, This will use StandardDirectoryReader's implementation which determines if the reader is current by + * comparing the on-disk SegmentInfos version against the one in the reader, which at refresh points will always return isCurrent false and then refreshNeeded true. + * Even if this method returns refresh as needed, we ignore it and only ever refresh with incoming SegmentInfos. + */ + @Override + public boolean refreshNeeded() { + return false; + } + @Override public boolean isTranslogSyncNeeded() { return translogManager.getTranslog().syncNeeded(); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 1bcedf28e02de..7c1e3150f7a9e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3770,6 +3770,10 @@ public boolean scheduledRefresh() { if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it && isSearchIdle() && indexSettings.isExplicitRefresh() == false + && indexSettings.isSegRepEnabled() == false + // Indices with segrep enabled will never wait on a refresh and ignore shard idle. Primary shards push out new segments only + // after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving + // a new set of segments. && active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive // lets skip this refresh since we are search idle and // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java new file mode 100644 index 0000000000000..3fcf6116b11a2 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -0,0 +1,59 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.indices.replication.common.ReplicationType; + +public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { + + private static final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + + public void testIgnoreShardIdle() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + final int numDocs = shards.indexDocs(randomInt(10)); + primary.refresh("test"); + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs); + + primary.scheduledRefresh(); + replica.scheduledRefresh(); + + primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + + // Update the search_idle setting, this will put both shards into search idle. + Settings updatedSettings = Settings.builder() + .put(settings) + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) + .build(); + primary.indexSettings().getScopedSettings().applySettings(updatedSettings); + replica.indexSettings().getScopedSettings().applySettings(updatedSettings); + + primary.scheduledRefresh(); + replica.scheduledRefresh(); + + // Shards without segrep will register a new RefreshListener on the engine and return true when registered, + // assert with segrep enabled that awaitShardSearchActive does not register a listener. + primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + } + } +} diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index c7ac9fe889a9f..11aeb0b538fe3 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -140,6 +140,16 @@ protected ReplicationGroup createGroup(int replicas, Settings settings) throws I return new ReplicationGroup(metadata); } + protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFactory engineFactory) throws IOException { + IndexMetadata metadata = buildIndexMetadata(replicas, settings, indexMapping); + return new ReplicationGroup(metadata) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return engineFactory; + } + }; + } + protected IndexMetadata buildIndexMetadata(int replicas) throws IOException { return buildIndexMetadata(replicas, indexMapping); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 371fa6d102304..cad6579ac941d 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -33,9 +33,15 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.junit.Assert; +import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; +import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; @@ -58,6 +64,7 @@ import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.internal.io.IOUtils; @@ -82,6 +89,7 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; @@ -94,7 +102,14 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.recovery.StartRecoveryRequest; +import org.opensearch.indices.replication.CheckpointInfoResponse; +import org.opensearch.indices.replication.GetSegmentFilesResponse; +import org.opensearch.indices.replication.SegmentReplicationSource; +import org.opensearch.indices.replication.SegmentReplicationTarget; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.IndexId; @@ -112,6 +127,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -122,6 +138,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Mockito.mock; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; /** @@ -1116,4 +1133,117 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { } }; } + + /** + * Segment Replication specific test method - Replicate segments to a list of replicas from a given primary. + * This test will use a real {@link SegmentReplicationTarget} for each replica with a mock {@link SegmentReplicationSource} that + * writes all segments directly to the target. + */ + public final void replicateSegments(IndexShard primaryShard, List replicaShards) throws IOException, InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size()); + Store.MetadataSnapshot primaryMetadata; + try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { + final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get(); + primaryMetadata = primaryShard.store().getMetadata(primarySegmentInfos); + } + final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard); + + final ReplicationCollection replicationCollection = new ReplicationCollection<>(logger, threadPool); + final SegmentReplicationSource source = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse( + new CheckpointInfoResponse( + copyState.getCheckpoint(), + copyState.getMetadataSnapshot(), + copyState.getInfosBytes(), + copyState.getPendingDeleteFiles() + ) + ); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + try ( + final ReplicationCollection.ReplicationRef replicationRef = replicationCollection.get( + replicationId + ) + ) { + writeFileChunks(replicationRef.get(), primaryShard, filesToFetch.toArray(new StoreFileMetadata[] {})); + } catch (IOException e) { + listener.onFailure(e); + } + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + + for (IndexShard replica : replicaShards) { + final SegmentReplicationTarget target = new SegmentReplicationTarget( + ReplicationCheckpoint.empty(replica.shardId), + replica, + source, + new ReplicationListener() { + @Override + public void onDone(ReplicationState state) { + try (final GatedCloseable snapshot = replica.getSegmentInfosSnapshot()) { + final SegmentInfos replicaInfos = snapshot.get(); + final Store.MetadataSnapshot replicaMetadata = replica.store().getMetadata(replicaInfos); + final Store.RecoveryDiff recoveryDiff = primaryMetadata.recoveryDiff(replicaMetadata); + assertTrue(recoveryDiff.missing.isEmpty()); + assertTrue(recoveryDiff.different.isEmpty()); + assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); + assertEquals(primaryMetadata.getCommitUserData(), replicaMetadata.getCommitUserData()); + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + countDownLatch.countDown(); + } + + @Override + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + logger.error("Unexpected replication failure in test", e); + Assert.fail("test replication should not fail: " + e); + } + } + ); + replicationCollection.start(target, TimeValue.timeValueMillis(5000)); + target.startReplication(new ActionListener<>() { + @Override + public void onResponse(Void o) { + replicationCollection.markAsDone(target.getId()); + } + + @Override + public void onFailure(Exception e) { + replicationCollection.fail(target.getId(), new OpenSearchException("Segment Replication failed", e), true); + } + }); + } + countDownLatch.await(3, TimeUnit.SECONDS); + } + + private void writeFileChunks(SegmentReplicationTarget target, IndexShard primary, StoreFileMetadata[] files) throws IOException { + for (StoreFileMetadata md : files) { + try (IndexInput in = primary.store().directory().openInput(md.name(), IOContext.READONCE)) { + int pos = 0; + while (pos < md.length()) { + int length = between(1, Math.toIntExact(md.length() - pos)); + byte[] buffer = new byte[length]; + in.readBytes(buffer, 0, length); + target.writeFileChunk(md, pos, new BytesArray(buffer), pos + length == md.length(), 0, mock(ActionListener.class)); + pos += length; + } + } + } + } }