From beda3ce2b05606f469b7e1e71099e9e0fcae0637 Mon Sep 17 00:00:00 2001 From: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com> Date: Mon, 3 Oct 2022 08:45:47 -0700 Subject: [PATCH] Adding check to make sure checkpoint is not processed when a shard's shard routing is primary. (#4630) Signed-off-by: Rishikesh1159 Signed-off-by: Rishikesh1159 Co-authored-by: Suraj Singh --- CHANGELOG.md | 1 + .../opensearch/index/shard/IndexShard.java | 4 +++ .../SegmentReplicationIndexShardTests.java | 31 +++++++++++++++++++ 3 files changed, 36 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ad53f4fdc803..a84815ac1cc52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -88,6 +88,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fixed the ignore_malformed setting to also ignore objects ([#4494](https://github.com/opensearch-project/OpenSearch/pull/4494)) - [Segment Replication] Ignore lock file when testing cleanupAndPreserveLatestCommitPoint ([#4544](https://github.com/opensearch-project/OpenSearch/pull/4544)) - Updated jackson to 2.13.4 and snakeyml to 1.32 ([#4556](https://github.com/opensearch-project/OpenSearch/pull/4556)) +- [Segment Replication] Adding check to make sure checkpoint is not processed when a shard's shard routing is primary ([#4630](https://github.com/opensearch-project/OpenSearch/pull/4630)) - [Bug]: Fixed invalid location of JDK dependency for arm64 architecture([#4613](https://github.com/opensearch-project/OpenSearch/pull/4613)) - [Bug]: Alias filter lost after rollover ([#4499](https://github.com/opensearch-project/OpenSearch/pull/4499)) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 9185ef0d440ce..d05f7c34f80ce 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1441,6 +1441,10 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints."); return false; } + if (this.routingEntry().primary()) { + logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints."); + return false; + } ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); if (localCheckpoint.isAheadOf(requestCheckpoint)) { logger.trace( diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 007317f6e71cd..da04ea1b9914b 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -62,6 +62,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { @@ -208,6 +209,36 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException { closeShards(shard); } + /** + * here we are starting a new primary shard and testing that we don't process a checkpoint on a shard when it's shard routing is primary. + */ + public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { + IndexShard primaryShard = newStartedShard(true); + SegmentReplicationTargetService sut; + sut = prepareForReplication(primaryShard); + SegmentReplicationTargetService spy = spy(sut); + + // Starting a new shard in PrimaryMode and shard routing primary. + IndexShard spyShard = spy(primaryShard); + String id = primaryShard.routingEntry().allocationId().getId(); + + // Starting relocation handoff + primaryShard.getReplicationTracker().startRelocationHandoff(id); + + // Completing relocation handoff. + primaryShard.getReplicationTracker().completeRelocationHandoff(); + + // Assert that primary shard is no longer in Primary Mode and shard routing is still Primary + assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode()); + assertEquals(true, primaryShard.routingEntry().primary()); + + spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, 0L), spyShard); + + // Verify that checkpoint is not processed as shard routing is primary. + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(primaryShard); + } + public void testReplicaReceivesGenIncrease() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { shards.startAll();