From af8b4e902cf4b0bd25a9590f0f813837df9bcc9d Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 4 Sep 2024 22:09:47 -0700 Subject: [PATCH] [Backport 2.x] [RW Separation] Add polling segment replication for search replicas (#15627) (#15717) --- CHANGELOG.md | 2 +- .../SearchReplicaReplicationIT.java | 85 +++++++++ .../org/opensearch/index/IndexModule.java | 54 +++++- .../org/opensearch/index/IndexService.java | 73 +++++++- .../index/seqno/ReplicationTracker.java | 11 +- .../opensearch/indices/IndicesService.java | 12 +- .../SegmentReplicationTargetService.java | 1 + .../replication/SegmentReplicator.java | 35 +++- .../main/java/org/opensearch/node/Node.java | 7 +- .../opensearch/index/IndexModuleTests.java | 3 +- .../opensearch/index/IndexServiceTests.java | 53 ++++++ .../replication/SegmentReplicatorTests.java | 163 ++++++++++++++++++ 12 files changed, 482 insertions(+), 17 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java create mode 100644 server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 07cbbbe020286..6713403a4e339 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,7 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494)) - [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788)) - [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527)) -- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410)) +- [Reader Writer Separation] Add experimental search replica shard type to achieve reader writer separation ([#15237](https://github.com/opensearch-project/OpenSearch/pull/15237)) - Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290)) - [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291))) - Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java new file mode 100644 index 0000000000000..a1b512c326ac5 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; +import org.junit.Before; + +import java.nio.file.Path; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT { + + private static final String REPOSITORY_NAME = "test-remote-store-repo"; + protected Path absolutePath; + + private Boolean useRemoteStore; + + @Before + public void randomizeRemoteStoreEnabled() { + useRemoteStore = randomBoolean(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + if (useRemoteStore) { + if (absolutePath == null) { + absolutePath = randomRepoPath().toAbsolutePath(); + } + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath)) + .build(); + } + return super.nodeSettings(nodeOrdinal); + } + + @After + public void teardown() { + if (useRemoteStore) { + clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); + } + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .build(); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build(); + } + + public void testReplication() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + final int docCount = 10; + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + waitForSearchableDocs(docCount, primary, replica); + } + +} diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 33897599fdd51..1725afcad72cf 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -73,6 +73,7 @@ import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.IndexEventListener; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.index.similarity.SimilarityService; @@ -729,6 +730,56 @@ public IndexService newIndexService( Supplier clusterDefaultRefreshIntervalSupplier, RecoverySettings recoverySettings, RemoteStoreSettings remoteStoreSettings + ) throws IOException { + return newIndexService( + indexCreationContext, + environment, + xContentRegistry, + shardStoreDeleter, + circuitBreakerService, + bigArrays, + threadPool, + scriptService, + clusterService, + client, + indicesQueryCache, + mapperRegistry, + indicesFieldDataCache, + namedWriteableRegistry, + idFieldDataEnabled, + valuesSourceRegistry, + remoteDirectoryFactory, + translogFactorySupplier, + clusterDefaultRefreshIntervalSupplier, + recoverySettings, + remoteStoreSettings, + (s) -> {} + ); + } + + public IndexService newIndexService( + IndexService.IndexCreationContext indexCreationContext, + NodeEnvironment environment, + NamedXContentRegistry xContentRegistry, + IndexService.ShardStoreDeleter shardStoreDeleter, + CircuitBreakerService circuitBreakerService, + BigArrays bigArrays, + ThreadPool threadPool, + ScriptService scriptService, + ClusterService clusterService, + Client client, + IndicesQueryCache indicesQueryCache, + MapperRegistry mapperRegistry, + IndicesFieldDataCache indicesFieldDataCache, + NamedWriteableRegistry namedWriteableRegistry, + BooleanSupplier idFieldDataEnabled, + ValuesSourceRegistry valuesSourceRegistry, + IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, + BiFunction translogFactorySupplier, + Supplier clusterDefaultRefreshIntervalSupplier, + RecoverySettings recoverySettings, + RemoteStoreSettings remoteStoreSettings, + Consumer replicator ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -789,7 +840,8 @@ public IndexService newIndexService( recoverySettings, remoteStoreSettings, fileCache, - compositeIndexSettings + compositeIndexSettings, + replicator ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 3d1bc2851069f..6e67c8f83ee4c 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -136,6 +136,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static org.opensearch.common.collect.MapBuilder.newMapBuilder; +import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING; import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings; /** @@ -174,6 +175,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private volatile AsyncTranslogFSync fsyncTask; private volatile AsyncGlobalCheckpointTask globalCheckpointTask; private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask; + private volatile AsyncReplicationTask asyncReplicationTask; // don't convert to Setting<> and register... we only set this in tests and register via a plugin private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval"; @@ -194,6 +196,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final RemoteStoreSettings remoteStoreSettings; private final FileCache fileCache; private final CompositeIndexSettings compositeIndexSettings; + private final Consumer replicator; public IndexService( IndexSettings indexSettings, @@ -231,7 +234,8 @@ public IndexService( RecoverySettings recoverySettings, RemoteStoreSettings remoteStoreSettings, FileCache fileCache, - CompositeIndexSettings compositeIndexSettings + CompositeIndexSettings compositeIndexSettings, + Consumer replicator ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -306,11 +310,15 @@ public IndexService( this.trimTranslogTask = new AsyncTrimTranslogTask(this); this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); + if (READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(indexSettings.getNodeSettings())) { + this.asyncReplicationTask = new AsyncReplicationTask(this); + } this.translogFactorySupplier = translogFactorySupplier; this.recoverySettings = recoverySettings; this.remoteStoreSettings = remoteStoreSettings; this.compositeIndexSettings = compositeIndexSettings; this.fileCache = fileCache; + this.replicator = replicator; updateFsyncTaskIfNecessary(); } @@ -387,7 +395,8 @@ public IndexService( recoverySettings, remoteStoreSettings, fileCache, - null + null, + (s) -> {} ); } @@ -463,7 +472,8 @@ public IndexService( recoverySettings, remoteStoreSettings, null, - null + null, + s -> {} ); } @@ -472,6 +482,11 @@ static boolean needsMapperService(IndexSettings indexSettings, IndexCreationCont && indexCreationContext == IndexCreationContext.CREATE_INDEX); // metadata verification needs a mapper service } + // visible for tests + AsyncReplicationTask getReplicationTask() { + return asyncReplicationTask; + } + /** * Context for index creation * @@ -1142,11 +1157,22 @@ public synchronized void updateMetadata(final IndexMetadata currentIndexMetadata } onRefreshIntervalChange(); updateFsyncTaskIfNecessary(); + if (READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(indexSettings.getNodeSettings())) { + updateReplicationTask(); + } } metadataListeners.forEach(c -> c.accept(newIndexMetadata)); } + private void updateReplicationTask() { + try { + asyncReplicationTask.close(); + } finally { + asyncReplicationTask = new AsyncReplicationTask(this); + } + } + /** * Called whenever the refresh interval changes. This can happen in 2 cases - * 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for @@ -1411,6 +1437,47 @@ public String toString() { } } + final class AsyncReplicationTask extends BaseAsyncTask { + + AsyncReplicationTask(IndexService indexService) { + super(indexService, indexService.getRefreshInterval()); + } + + @Override + protected void runInternal() { + indexService.maybeSyncSegments(false); + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + + @Override + public String toString() { + return "replication"; + } + + @Override + protected boolean mustReschedule() { + return indexSettings.isSegRepEnabledOrRemoteNode() && super.mustReschedule(); + } + } + + private void maybeSyncSegments(boolean force) { + if (getRefreshInterval().millis() > 0 || force) { + for (IndexShard shard : this.shards.values()) { + try { + if (shard.routingEntry().isSearchOnly() && shard.routingEntry().active()) { + replicator.accept(shard); + } + } catch (IndexShardClosedException | AlreadyClosedException ex) { + // do nothing + } + } + } + } + final class AsyncTrimTranslogTask extends BaseAsyncTask { AsyncTrimTranslogTask(IndexService indexService) { diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index ee92e09698a98..9bffaedcbf482 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -1253,12 +1253,13 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { return this.latestReplicationCheckpoint; } - private boolean isPrimaryRelocation(String allocationId) { + // skip any shard that is a relocating primary or search only replica (not tracked by primary) + private boolean shouldSkipReplicationTimer(String allocationId) { Optional shardRouting = routingTable.shards() .stream() .filter(routing -> routing.allocationId().getId().equals(allocationId)) .findAny(); - return shardRouting.isPresent() && shardRouting.get().primary(); + return shardRouting.isPresent() && (shardRouting.get().primary() || shardRouting.get().isSearchOnly()); } private void createReplicationLagTimers() { @@ -1270,7 +1271,7 @@ private void createReplicationLagTimers() { // it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event. if (cps.inSync && replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false - && isPrimaryRelocation(allocationId) == false + && shouldSkipReplicationTimer(allocationId) == false && latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint) && (indexSettings.isSegRepLocalEnabled() == true || isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(allocationId).currentNodeId()))) { @@ -1304,7 +1305,7 @@ public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpo final CheckpointState cps = e.getValue(); if (cps.inSync && replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false - && isPrimaryRelocation(e.getKey()) == false + && shouldSkipReplicationTimer(e.getKey()) == false && latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint) && cps.checkpointTimers.containsKey(latestReplicationCheckpoint)) { cps.checkpointTimers.get(latestReplicationCheckpoint).start(); @@ -1332,7 +1333,7 @@ public synchronized Set getSegmentReplicationStats entry -> entry.getKey().equals(this.shardAllocationId) == false && entry.getValue().inSync && replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false - && isPrimaryRelocation(entry.getKey()) == false + && shouldSkipReplicationTimer(entry.getKey()) == false /*Check if the current primary shard is migrating to remote and all the other shard copies of the same index still hasn't completely moved over to the remote enabled nodes. Ensures that: diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index f95b53e627567..1981e5c08d7c3 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -361,6 +361,7 @@ public class IndicesService extends AbstractLifecycleComponent private final SearchRequestStats searchRequestStats; private final FileCache fileCache; private final CompositeIndexSettings compositeIndexSettings; + private final Consumer replicator; @Override protected void doStart() { @@ -397,7 +398,8 @@ public IndicesService( CacheService cacheService, RemoteStoreSettings remoteStoreSettings, FileCache fileCache, - CompositeIndexSettings compositeIndexSettings + CompositeIndexSettings compositeIndexSettings, + Consumer replicator ) { this.settings = settings; this.threadPool = threadPool; @@ -506,6 +508,7 @@ protected void closeInternal() { this.remoteStoreSettings = remoteStoreSettings; this.compositeIndexSettings = compositeIndexSettings; this.fileCache = fileCache; + this.replicator = replicator; } public IndicesService( @@ -567,7 +570,8 @@ public IndicesService( cacheService, remoteStoreSettings, fileCache, - null + null, + (s) -> {} ); } @@ -629,6 +633,7 @@ public IndicesService( cacheService, remoteStoreSettings, null, + null, null ); } @@ -1046,7 +1051,8 @@ private synchronized IndexService createIndexService( translogFactorySupplier, this::getClusterDefaultRefreshInterval, this.recoverySettings, - this.remoteStoreSettings + this.remoteStoreSettings, + replicator ); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index a042e08c2a53f..8fee3f671ecc9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -157,6 +157,7 @@ public SegmentReplicationTargetService( ForceSyncRequest::new, new ForceSyncTransportRequestHandler() ); + replicator.setSourceFactory(sourceFactory); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java index 417556c22636e..ad3bc1933208c 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CorruptIndexException; import org.opensearch.OpenSearchCorruptionException; +import org.opensearch.common.SetOnce; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; @@ -44,12 +45,44 @@ public class SegmentReplicator { private final Map completedReplications = ConcurrentCollections.newConcurrentMap(); private final ThreadPool threadPool; + private final SetOnce sourceFactory; + public SegmentReplicator(ThreadPool threadPool) { this.onGoingReplications = new ReplicationCollection<>(logger, threadPool); this.threadPool = threadPool; + this.sourceFactory = new SetOnce<>(); + } + + /** + * Starts a replication event for the given shard. + * @param shard - {@link IndexShard} replica shard + */ + public void startReplication(IndexShard shard) { + if (sourceFactory.get() == null) return; + startReplication( + shard, + shard.getLatestReplicationCheckpoint(), + sourceFactory.get().get(shard), + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + logger.trace("Completed replication for {}", shard.shardId()); + } + + @Override + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { + logger.error(() -> new ParameterizedMessage("Failed segment replication for {}", shard.shardId()), e); + if (sendShardFailure) { + shard.failShard("unrecoverable replication failure", e); + } + } + } + ); } - // TODO: Add public entrypoint for replication on an interval to be invoked via IndexService + void setSourceFactory(SegmentReplicationSourceFactory sourceFactory) { + this.sourceFactory.set(sourceFactory); + } /** * Start a round of replication and sync to at least the given checkpoint. diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 77aa49b91a2c3..42b84d8e79d88 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -893,6 +893,7 @@ protected Node( remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); CacheModule cacheModule = new CacheModule(pluginsService.filterPlugins(CachePlugin.class), settings); CacheService cacheService = cacheModule.getCacheService(); + final SegmentReplicator segmentReplicator = new SegmentReplicator(threadPool); final IndicesService indicesService = new IndicesService( settings, pluginsService, @@ -922,7 +923,8 @@ protected Node( cacheService, remoteStoreSettings, fileCache, - compositeIndexSettings + compositeIndexSettings, + segmentReplicator::startReplication ); final IngestService ingestService = new IngestService( @@ -1424,7 +1426,7 @@ protected Node( new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService), indicesService, clusterService, - new SegmentReplicator(threadPool) + segmentReplicator ) ); b.bind(SegmentReplicationSourceService.class) @@ -1459,6 +1461,7 @@ protected Node( b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory); + b.bind(SegmentReplicator.class).toInstance(segmentReplicator); }); injector = modules.createInjector(); diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index d656285deb2bf..3138297eb8ebb 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -263,7 +263,8 @@ private IndexService newIndexService(IndexModule module) throws IOException { translogFactorySupplier, () -> IndexSettings.DEFAULT_REFRESH_INTERVAL, DefaultRecoverySettings.INSTANCE, - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + s -> {} ); } diff --git a/server/src/test/java/org/opensearch/index/IndexServiceTests.java b/server/src/test/java/org/opensearch/index/IndexServiceTests.java index 00c8b52d30c55..f0bad0e653453 100644 --- a/server/src/test/java/org/opensearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/opensearch/index/IndexServiceTests.java @@ -41,6 +41,7 @@ import org.opensearch.common.compress.CompressedXContent; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -52,6 +53,7 @@ import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.translog.Translog; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalSettingsPlugin; import org.opensearch.test.OpenSearchSingleNodeTestCase; @@ -590,6 +592,57 @@ public void testIndexSortBackwardCompatible() { } } + public void testReplicationTask() throws Exception { + // create with docrep - task should not schedule + IndexService indexService = createIndex( + "docrep_index", + Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT).build() + ); + final Index index = indexService.index(); + ensureGreen(index.getName()); + IndexService.AsyncReplicationTask task = indexService.getReplicationTask(); + assertFalse(task.isScheduled()); + assertFalse(task.mustReschedule()); + + // create for segrep - task should schedule + indexService = createIndex( + "segrep_index", + Settings.builder() + .put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "5s") + .build() + ); + final Index srIndex = indexService.index(); + ensureGreen(srIndex.getName()); + task = indexService.getReplicationTask(); + assertTrue(task.isScheduled()); + assertTrue(task.mustReschedule()); + assertEquals(5000, task.getInterval().millis()); + + // test we can update the interval + client().admin() + .indices() + .prepareUpdateSettings("segrep_index") + .setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s")) + .get(); + + IndexService.AsyncReplicationTask updatedTask = indexService.getReplicationTask(); + assertNotSame(task, updatedTask); + assertFalse(task.isScheduled()); + assertTrue(task.isClosed()); + assertTrue(updatedTask.isScheduled()); + assertTrue(updatedTask.mustReschedule()); + assertEquals(1000, updatedTask.getInterval().millis()); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.getKey(), true) + .build(); + } + private static String createTestMapping(String type) { return " \"properties\": {\n" + " \"test\": {\n" diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java new file mode 100644 index 0000000000000..7acee449a1b46 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java @@ -0,0 +1,163 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.lucene.store.IOContext; +import org.opensearch.OpenSearchCorruptionException; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.replication.TestReplicationSource; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +import org.mockito.Mockito; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class SegmentReplicatorTests extends IndexShardTestCase { + + private static final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + + public void testStartReplicationWithoutSourceFactory() { + ThreadPool threadpool = mock(ThreadPool.class); + ExecutorService mock = mock(ExecutorService.class); + when(threadpool.generic()).thenReturn(mock); + SegmentReplicator segmentReplicator = new SegmentReplicator(threadpool); + + IndexShard shard = mock(IndexShard.class); + segmentReplicator.startReplication(shard); + Mockito.verifyNoInteractions(mock); + } + + public void testStartReplicationRunsSuccessfully() throws Exception { + final IndexShard replica = newStartedShard(false, settings, new NRTReplicationEngineFactory()); + final IndexShard primary = newStartedShard(true, settings, new NRTReplicationEngineFactory()); + + // index and copy segments to replica. + int numDocs = randomIntBetween(10, 20); + for (int i = 0; i < numDocs; i++) { + indexDoc(primary, "_doc", Integer.toString(i)); + } + primary.refresh("test"); + + SegmentReplicator segmentReplicator = spy(new SegmentReplicator(threadPool)); + SegmentReplicationSourceFactory factory = mock(SegmentReplicationSourceFactory.class); + when(factory.get(replica)).thenReturn(new TestReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + resolveCheckpointListener(listener, primary); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + BiConsumer fileProgressTracker, + ActionListener listener + ) { + try { + Lucene.cleanLuceneIndex(indexShard.store().directory()); + Map segmentMetadataMap = primary.getSegmentMetadataMap(); + for (String file : segmentMetadataMap.keySet()) { + indexShard.store().directory().copyFrom(primary.store().directory(), file, file, IOContext.DEFAULT); + } + listener.onResponse(new GetSegmentFilesResponse(new ArrayList<>(segmentMetadataMap.values()))); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + segmentReplicator.setSourceFactory(factory); + segmentReplicator.startReplication(replica); + assertBusy(() -> assertDocCount(replica, numDocs)); + closeShards(primary, replica); + } + + public void testReplicationFails() throws Exception { + allowShardFailures(); + final IndexShard replica = newStartedShard(false, settings, new NRTReplicationEngineFactory()); + final IndexShard primary = newStartedShard(true, settings, new NRTReplicationEngineFactory()); + + SegmentReplicator segmentReplicator = spy(new SegmentReplicator(threadPool)); + SegmentReplicationSourceFactory factory = mock(SegmentReplicationSourceFactory.class); + when(factory.get(replica)).thenReturn(new TestReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + resolveCheckpointListener(listener, primary); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + BiConsumer fileProgressTracker, + ActionListener listener + ) { + OpenSearchCorruptionException corruptIndexException = new OpenSearchCorruptionException("test"); + try { + indexShard.store().markStoreCorrupted(corruptIndexException); + } catch (IOException e) { + throw new RuntimeException(e); + } + listener.onFailure(corruptIndexException); + } + }); + // assert shard failure on corruption + AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false); + replica.addShardFailureCallback((ig) -> failureCallbackTriggered.set(true)); + segmentReplicator.setSourceFactory(factory); + segmentReplicator.startReplication(replica); + assertBusy(() -> assertTrue(failureCallbackTriggered.get())); + closeShards(primary, replica); + } + + protected void resolveCheckpointListener(ActionListener listener, IndexShard primary) { + try (final CopyState copyState = new CopyState(primary)) { + listener.onResponse( + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) + ); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + +}