diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e033e1ffb2bf..cb48b3aedeea5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix flaky random test `NRTReplicationEngineTests.testUpdateSegments` ([#4352](https://github.com/opensearch-project/OpenSearch/pull/4352)) - [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386)) - [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363)) +- [Segment Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test ([#4414](https://github.com/opensearch-project/OpenSearch/pull/4414)) - Fixed the `_cat/shards/10_basic.yml` test cases fix. ### Security diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 7c28406036ddd..6a9406aca13b9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -160,9 +160,9 @@ public void startReplication(ActionListener listener) { final StepListener getFilesListener = new StepListener<>(); final StepListener finalizeListener = new StepListener<>(); + cancellableThreads.checkForCancel(); logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId()); // Get list of files to copy from this checkpoint. - cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 1d253b0a9a300..f2eb635f24bbf 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; @@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.doAnswer; @@ -37,6 +39,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.eq; +import static org.opensearch.indices.replication.SegmentReplicationState.Stage.CANCELLED; public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { @@ -215,24 +218,25 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I // Mocking response when startReplication is called on targetSpy we send a new checkpoint to serviceSpy and later reduce countdown // of latch. doAnswer(invocation -> { - final ActionListener listener = invocation.getArgument(0); + // short circuit loop on new checkpoint request + doReturn(null).when(serviceSpy).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any()); // a new checkpoint arrives before we've completed. serviceSpy.onNewCheckpoint(newPrimaryCheckpoint, replicaShard); - listener.onResponse(null); - latch.countDown(); + try { + invocation.callRealMethod(); + } catch (CancellableThreads.ExecutionCancelledException e) { + latch.countDown(); + } return null; }).when(targetSpy).startReplication(any()); - doNothing().when(targetSpy).onDone(); // start replication. This adds the target to on-ongoing replication collection serviceSpy.startReplication(targetSpy); - + latch.await(); // wait for the new checkpoint to arrive, before the listener completes. - latch.await(5, TimeUnit.SECONDS); - doNothing().when(targetSpy).startReplication(any()); + assertEquals(CANCELLED, targetSpy.state().getStage()); verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary"); verify(serviceSpy, times(1)).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any()); - closeShards(replicaShard); } public void testNewCheckpointBehindCurrentCheckpoint() { diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 1b40cb4f2dfa3..0838a1fe87aa4 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1207,6 +1207,7 @@ public void getCheckpointMetadata( copyState.getPendingDeleteFiles() ) ); + copyState.decRef(); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e); Assert.fail("Failed to compute copyState");