diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index a9b6787d87bdf..16e9d78b17826 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -33,17 +33,23 @@ import org.opensearch.index.engine.Segment; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.plugins.Plugin; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.Collection; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -65,6 +71,11 @@ public static void assumeFeatureFlag() { assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); } + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class); + } + @Override public Settings indexSettings() { return Settings.builder() @@ -318,6 +329,65 @@ public void testReplicationAfterForceMerge() throws Exception { } } + public void testCancellation() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); + ensureYellow(INDEX_NAME); + + final String replicaNode = internalCluster().startNode(); + + final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance( + SegmentReplicationSourceService.class, + primaryNode + ); + final IndexShard primaryShard = getIndexShard(primaryNode); + + CountDownLatch latch = new CountDownLatch(1); + + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + primaryNode + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, replicaNode), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { + FileChunkRequest req = (FileChunkRequest) request; + logger.debug("file chunk [{}] lastChunk: {}", req, req.lastChunk()); + if (req.name().endsWith("cfs") && req.lastChunk()) { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + connection.sendRequest(requestId, action, request, options); + } + ); + + final int docCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(docCount); + waitForDocs(docCount, indexer); + + flush(INDEX_NAME); + } + segmentReplicationSourceService.beforeIndexShardClosed(primaryShard.shardId(), primaryShard, indexSettings()); + latch.countDown(); + assertDocCounts(docCount, primaryNode); + } + public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index b1009a769d806..5f6ec7e505805 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -36,7 +36,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.times; - public class SegmentReplicationSourceHandlerTests extends IndexShardTestCase { private final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);