Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication ] Add shard routing primary check when processing a checkpoint. #4630

Merged
merged 2 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fixed the ignore_malformed setting to also ignore objects ([#4494](https://github.com/opensearch-project/OpenSearch/pull/4494))
- [Segment Replication] Ignore lock file when testing cleanupAndPreserveLatestCommitPoint ([#4544](https://github.com/opensearch-project/OpenSearch/pull/4544))
- Updated jackson to 2.13.4 and snakeyml to 1.32 ([#4556](https://github.com/opensearch-project/OpenSearch/pull/4556))
- [Segment Replication] Adding check to make sure checkpoint is not processed when a shard's shard routing is primary ([#4630](https://github.com/opensearch-project/OpenSearch/pull/4630))

### Security
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,10 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints.");
return false;
}
if (this.routingEntry().primary()) {
logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints.");
return false;
}
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;

public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase {

Expand Down Expand Up @@ -208,6 +209,36 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException {
closeShards(shard);
}

/**
* here we are starting a new primary shard and testing that we don't process a checkpoint on a shard when it's shard routing is primary.
*/
public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
IndexShard primaryShard = newStartedShard(true);
SegmentReplicationTargetService sut;
sut = prepareForReplication(primaryShard);
SegmentReplicationTargetService spy = spy(sut);

// Starting a new shard in PrimaryMode and shard routing primary.
IndexShard spyShard = spy(primaryShard);
String id = primaryShard.routingEntry().allocationId().getId();

// Starting relocation handoff
primaryShard.getReplicationTracker().startRelocationHandoff(id);

// Completing relocation handoff.
primaryShard.getReplicationTracker().completeRelocationHandoff();

// Assert that primary shard is no longer in Primary Mode and shard routing is still Primary
assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode());
assertEquals(true, primaryShard.routingEntry().primary());

spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, 0L), spyShard);

// Verify that checkpoint is not processed as shard routing is primary.
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(primaryShard);
}

public void testReplicaReceivesGenIncrease() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
Expand Down