Skip to content

Commit

Permalink
[Semgnet Replication] Update flaky testOnNewCheckpointFromNewPrimaryC…
Browse files Browse the repository at this point in the history
…ancelOngoingReplication unit test (#4414) (#4425)

* [Semgnet Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Add changelog entry

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Update changelog entry

Signed-off-by: Suraj Singh <surajrider@gmail.com>

Signed-off-by: Suraj Singh <surajrider@gmail.com>

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 authored Sep 7, 2022
1 parent 1f339e5 commit 1eae8f4
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225))
- [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363))
- [Segment Replication] Bump segment infos counter before commit during replica promotion ([#4365](https://github.com/opensearch-project/OpenSearch/pull/4365))
- [Segment Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test ([#4414](https://github.com/opensearch-project/OpenSearch/pull/4414))
- [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386))
- [Segment Replication] Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ public void startReplication(ActionListener<Void> listener) {
final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();
final StepListener<Void> finalizeListener = new StepListener<>();

cancellableThreads.checkForCancel();
logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId());
// Get list of files to copy from this checkpoint.
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.TimeUnit;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doAnswer;
Expand All @@ -37,6 +39,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.eq;
import static org.opensearch.indices.replication.SegmentReplicationState.Stage.CANCELLED;

public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {

Expand Down Expand Up @@ -215,24 +218,25 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I
// Mocking response when startReplication is called on targetSpy we send a new checkpoint to serviceSpy and later reduce countdown
// of latch.
doAnswer(invocation -> {
final ActionListener<Void> listener = invocation.getArgument(0);
// short circuit loop on new checkpoint request
doReturn(null).when(serviceSpy).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any());
// a new checkpoint arrives before we've completed.
serviceSpy.onNewCheckpoint(newPrimaryCheckpoint, replicaShard);
listener.onResponse(null);
latch.countDown();
try {
invocation.callRealMethod();
} catch (CancellableThreads.ExecutionCancelledException e) {
latch.countDown();
}
return null;
}).when(targetSpy).startReplication(any());
doNothing().when(targetSpy).onDone();

// start replication. This adds the target to on-ongoing replication collection
serviceSpy.startReplication(targetSpy);

latch.await();
// wait for the new checkpoint to arrive, before the listener completes.
latch.await(5, TimeUnit.SECONDS);
doNothing().when(targetSpy).startReplication(any());
assertEquals(CANCELLED, targetSpy.state().getStage());
verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary");
verify(serviceSpy, times(1)).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any());
closeShards(replicaShard);
}

public void testNewCheckpointBehindCurrentCheckpoint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,7 @@ public void getCheckpointMetadata(
listener.onResponse(
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
);
copyState.decRef();
} catch (IOException e) {
logger.error("Unexpected error computing CopyState", e);
Assert.fail("Failed to compute copyState");
Expand Down

0 comments on commit 1eae8f4

Please sign in to comment.