Skip to content

Commit

Permalink
Add tests for retryable errors.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Feb 17, 2023
1 parent a5b3763 commit ab1159d
Showing 1 changed file with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.replication.TestReplicationSource;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardNotStartedException;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.shard.ShardNotInPrimaryModeException;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
Expand Down Expand Up @@ -57,8 +60,6 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {
private ReplicationCheckpoint initialCheckpoint;
private ReplicationCheckpoint aheadCheckpoint;

private ReplicationCheckpoint newPrimaryCheckpoint;

@Override
public void setUp() throws Exception {
super.setUp();
Expand All @@ -83,13 +84,6 @@ public void setUp() throws Exception {
initialCheckpoint.getSeqNo(),
initialCheckpoint.getSegmentInfosVersion() + 1
);
newPrimaryCheckpoint = new ReplicationCheckpoint(
initialCheckpoint.getShardId(),
initialCheckpoint.getPrimaryTerm() + 1,
initialCheckpoint.getSegmentsGen(),
initialCheckpoint.getSeqNo(),
initialCheckpoint.getSegmentInfosVersion() + 1
);
}

@Override
Expand Down Expand Up @@ -117,9 +111,20 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile
assertEquals(0, latch.getCount());
}

public void testRetryIfPrimaryIsNotStarted() throws InterruptedException {
getCheckpointInfoAndThrow(new IndexShardNotStartedException(primaryShard.shardId(), IndexShardState.POST_RECOVERY), false);
}

public void testRetryIfPrimaryIsNotInPrimaryMode() throws InterruptedException {
getCheckpointInfoAndThrow(new ShardNotInPrimaryModeException(primaryShard.shardId(), IndexShardState.POST_RECOVERY), false);
}

public void testReplicationFails() throws InterruptedException {
getCheckpointInfoAndThrow(new OpenSearchException("Fail"), true);
}

private void getCheckpointInfoAndThrow(Exception expectedError, boolean shardFailure) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final OpenSearchException expectedError = new OpenSearchException("Fail");
SegmentReplicationSource source = new TestReplicationSource() {

@Override
Expand Down Expand Up @@ -157,6 +162,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile
// failures leave state object in last entered stage.
assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage());
assertEquals(expectedError, e.getCause());
assertEquals(shardFailure, sendShardFailure);
latch.countDown();
}
}
Expand All @@ -166,6 +172,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile
assertEquals(0, latch.getCount());
}


public void testShardAlreadyReplicating() throws InterruptedException {
// Create a spy of Target Service so that we can verify invocation of startReplication call with specific checkpoint on it.
SegmentReplicationTargetService serviceSpy = spy(sut);
Expand Down

0 comments on commit ab1159d

Please sign in to comment.