Skip to content

Commit

Permalink
[Semgnet Replication] Update flaky testOnNewCheckpointFromNewPrimaryC…
Browse files Browse the repository at this point in the history
…ancelOngoingReplication unit test (opensearch-project#4414)

* [Semgnet Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Add changelog entry

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Update changelog entry

Signed-off-by: Suraj Singh <surajrider@gmail.com>

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed Sep 6, 2022
1 parent 3d90d35 commit 72a6b06
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225))
- [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363))
- [Segment Replication] Bump segment infos counter before commit during replica promotion ([#4365](https://github.com/opensearch-project/OpenSearch/pull/4365))
- [Segment Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test ([#4414](https://github.com/opensearch-project/OpenSearch/pull/4414))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ public void startReplication(ActionListener<Void> listener) {
final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();
final StepListener<Void> finalizeListener = new StepListener<>();

cancellableThreads.checkForCancel();
logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId());
// Get list of files to copy from this checkpoint.
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.TimeUnit;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doAnswer;
Expand All @@ -37,6 +39,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.eq;
import static org.opensearch.indices.replication.SegmentReplicationState.Stage.CANCELLED;

public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {

Expand Down Expand Up @@ -215,24 +218,25 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I
// Mocking response when startReplication is called on targetSpy we send a new checkpoint to serviceSpy and later reduce countdown
// of latch.
doAnswer(invocation -> {
final ActionListener<Void> listener = invocation.getArgument(0);
// short circuit loop on new checkpoint request
doReturn(null).when(serviceSpy).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any());
// a new checkpoint arrives before we've completed.
serviceSpy.onNewCheckpoint(newPrimaryCheckpoint, replicaShard);
listener.onResponse(null);
latch.countDown();
try {
invocation.callRealMethod();
} catch (CancellableThreads.ExecutionCancelledException e) {
latch.countDown();
}
return null;
}).when(targetSpy).startReplication(any());
doNothing().when(targetSpy).onDone();

// start replication. This adds the target to on-ongoing replication collection
serviceSpy.startReplication(targetSpy);

latch.await();
// wait for the new checkpoint to arrive, before the listener completes.
latch.await(5, TimeUnit.SECONDS);
doNothing().when(targetSpy).startReplication(any());
assertEquals(CANCELLED, targetSpy.state().getStage());
verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary");
verify(serviceSpy, times(1)).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any());
closeShards(replicaShard);
}

public void testNewCheckpointBehindCurrentCheckpoint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,7 @@ public void getCheckpointMetadata(
copyState.getPendingDeleteFiles()
)
);
copyState.decRef();
} catch (IOException e) {
logger.error("Unexpected error computing CopyState", e);
Assert.fail("Failed to compute copyState");
Expand Down

0 comments on commit 72a6b06

Please sign in to comment.