Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Checkpoint Replay on Replica Shard #3658

Merged
merged 9 commits into from
Jul 21, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportService;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -49,6 +51,8 @@ public class SegmentReplicationTargetService implements IndexEventListener {

private final SegmentReplicationSourceFactory sourceFactory;

private final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = new HashMap<>();
kartg marked this conversation as resolved.
Show resolved Hide resolved

/**
* The internal actions
*
Expand Down Expand Up @@ -91,6 +95,15 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
* @param replicaShard replica shard on which checkpoint is received
*/
public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) {

// Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
if (latestReceivedCheckpoint.get(replicaShard.shardId()) != null) {
if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) {
latestReceivedCheckpoint.replace(replicaShard.shardId(), receivedCheckpoint);
}
} else {
latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint);
}
if (onGoingReplications.isShardReplicating(replicaShard.shardId())) {
logger.trace(
() -> new ParameterizedMessage(
Expand All @@ -100,10 +113,24 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
);
return;
}
final Thread thread = Thread.currentThread();
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {}
public void onReplicationDone(SegmentReplicationState state) {
// if we received a checkpoint during the copy event that is ahead of this
// try and process it.
if (latestReceivedCheckpoint.get(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) {
Runnable runnable = () -> onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard);
// Checks if we are using same thread and forks if necessary.
if (thread == Thread.currentThread()){
threadPool.generic().execute(runnable);
}
else{
runnable.run();
}
}
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,40 @@ public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOExc
closeShard(indexShard, false);
}

public void testReplicationOnDone() throws IOException {
Copy link
Member

@dreamer-89 dreamer-89 Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Rishikesh1159 : Will be good to have test cases for failure scenario, multiple checkpoints and IT exercising this code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have already covered failure scenario in previous PR.For multiple checkpoints we are already passing 2 checkpoints in many of our tests, so not sure if there is need to multiple checkpoints test case, But if needed we can do that also. And for IT for this, yes it is good to have an IT for this code, but I think we can make a different PR for that

SegmentReplicationTargetService spy = spy(sut);
IndexShard spyShard = spy(indexShard);
ReplicationCheckpoint cp = indexShard.getLatestReplicationCheckpoint();
ReplicationCheckpoint newCheckpoint = new ReplicationCheckpoint(
cp.getShardId(),
cp.getPrimaryTerm(),
cp.getSegmentsGen(),
cp.getSeqNo(),
cp.getSegmentInfosVersion() + 1
);
ReplicationCheckpoint anotherNewCheckpoint = new ReplicationCheckpoint(
cp.getShardId(),
cp.getPrimaryTerm(),
cp.getSegmentsGen(),
cp.getSeqNo(),
cp.getSegmentInfosVersion() + 2
);
ArgumentCaptor<SegmentReplicationTargetService.SegmentReplicationListener> captor = ArgumentCaptor.forClass(
SegmentReplicationTargetService.SegmentReplicationListener.class
);
doNothing().when(spy).startReplication(any(), any(), any());
spy.onNewCheckpoint(newCheckpoint, spyShard);
spy.onNewCheckpoint(anotherNewCheckpoint, spyShard);
verify(spy, times(1)).startReplication(eq(newCheckpoint), any(), captor.capture());
verify(spy, times(1)).onNewCheckpoint(eq(anotherNewCheckpoint), any());
SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue();
listener.onDone(new SegmentReplicationState(new ReplicationLuceneIndex()));
doNothing().when(spy).onNewCheckpoint(any(), any());
verify(spy, timeout(0).times(2)).onNewCheckpoint(eq(anotherNewCheckpoint), any());
closeShard(indexShard, false);

}

public void testBeforeIndexShardClosed_CancelsOngoingReplications() {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
Expand Down