From ab1159db99a386c9e8b2e0bef517b372fbe4c26b Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 16 Feb 2023 20:40:47 -0800 Subject: [PATCH] Add tests for retryable errors. Signed-off-by: Marc Handalian --- .../SegmentReplicationTargetServiceTests.java | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index ee51380ecb605..a49a34ec7b665 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -21,7 +21,10 @@ import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardNotStartedException; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.ShardNotInPrimaryModeException; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -57,8 +60,6 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { private ReplicationCheckpoint initialCheckpoint; private ReplicationCheckpoint aheadCheckpoint; - private ReplicationCheckpoint newPrimaryCheckpoint; - @Override public void setUp() throws Exception { super.setUp(); @@ -83,13 +84,6 @@ public void setUp() throws Exception { initialCheckpoint.getSeqNo(), initialCheckpoint.getSegmentInfosVersion() + 1 ); - newPrimaryCheckpoint = new ReplicationCheckpoint( - initialCheckpoint.getShardId(), - initialCheckpoint.getPrimaryTerm() + 1, - initialCheckpoint.getSegmentsGen(), - initialCheckpoint.getSeqNo(), - initialCheckpoint.getSegmentInfosVersion() + 1 - ); } @Override @@ -117,9 +111,20 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile assertEquals(0, latch.getCount()); } + public void testRetryIfPrimaryIsNotStarted() throws InterruptedException { + getCheckpointInfoAndThrow(new IndexShardNotStartedException(primaryShard.shardId(), IndexShardState.POST_RECOVERY), false); + } + + public void testRetryIfPrimaryIsNotInPrimaryMode() throws InterruptedException { + getCheckpointInfoAndThrow(new ShardNotInPrimaryModeException(primaryShard.shardId(), IndexShardState.POST_RECOVERY), false); + } + public void testReplicationFails() throws InterruptedException { + getCheckpointInfoAndThrow(new OpenSearchException("Fail"), true); + } + + private void getCheckpointInfoAndThrow(Exception expectedError, boolean shardFailure) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - final OpenSearchException expectedError = new OpenSearchException("Fail"); SegmentReplicationSource source = new TestReplicationSource() { @Override @@ -157,6 +162,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile // failures leave state object in last entered stage. assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage()); assertEquals(expectedError, e.getCause()); + assertEquals(shardFailure, sendShardFailure); latch.countDown(); } } @@ -166,6 +172,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile assertEquals(0, latch.getCount()); } + public void testShardAlreadyReplicating() throws InterruptedException { // Create a spy of Target Service so that we can verify invocation of startReplication call with specific checkpoint on it. SegmentReplicationTargetService serviceSpy = spy(sut);