Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Checkpoint Replay on Replica Shard #3658

Merged
merged 9 commits into from
Jul 21, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportService;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -49,6 +51,8 @@ public class SegmentReplicationTargetService implements IndexEventListener {

private final SegmentReplicationSourceFactory sourceFactory;

private final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = new HashMap<>();
kartg marked this conversation as resolved.
Show resolved Hide resolved

/**
* The internal actions
*
Expand Down Expand Up @@ -84,13 +88,34 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
}
}

/**
* Returns the Latest checkpoint received by replica shard based on shard ID.
* Returns null if replica shard has not received a checkpoint before
* @param shardId Shard Id of replica shard
*/
private ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) {
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
if (latestReceivedCheckpoint.containsKey(shardId)) {
return latestReceivedCheckpoint.get(shardId);
}
return null;
}

/**
* Invoked when a new checkpoint is received from a primary shard.
* It checks if a new checkpoint should be processed or not and starts replication if needed.
* @param receivedCheckpoint received checkpoint that is checked for processing
* @param replicaShard replica shard on which checkpoint is received
*/
public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) {

// Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
if (getLatestReceivedCheckpoint(replicaShard.shardId()) != null) {
if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) {
latestReceivedCheckpoint.replace(replicaShard.shardId(), receivedCheckpoint);
}
} else {
latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint);
}
if (onGoingReplications.isShardReplicating(replicaShard.shardId())) {
logger.trace(
() -> new ParameterizedMessage(
Expand All @@ -103,7 +128,13 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {}
public void onReplicationDone(SegmentReplicationState state) {
// if we received a checkpoint during the copy event that is ahead of this
// try and process it.
if (getLatestReceivedCheckpoint(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) {
onNewCheckpoint(getLatestReceivedCheckpoint(replicaShard.shardId()), replicaShard);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned that this recursive call could lead to a long-running thread and/or function stack overflow. As a follow-up to this PR, we should look into wrapping checkpoint processing in a Runnable so we can execute it async from the new-checkpoint notification.

cc @mch2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, i didn't realize the POC had the threading in place. Yes, if we have prior art, we can pull it into this PR

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: Do newer checkpoints supersede older checkpoints? What happens when the replication is slow is backlogged with too many checkpoints to process.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I wrapped checkpoint processing in a Runnable in new commit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bukhtawar we only store one latest received checkpoint and replace it whenever there is a new checkpoint. We don't store all received checkpoints to replica shard. So, even when replication is slow, we won't be backlogged with too many checkpoints, because we process only one checkpoint (which is latest received checkpoint)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks and I am assuming replication usually happens on the generic threadpool to start with?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Rishikesh1159, @mch2 I would recommend doing explicit checks rather than simply forking to guarantee we don't end up using the same thread

void execute(Runnable runnable, Thread originalThread) {
            if (originalThread == Thread.currentThread()) {
                fork(runnable);
            } else {
                runnable.run();
            }
        } 

}
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,39 @@ public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOExc
closeShard(indexShard, false);
}

public void testReplicationOnDone() throws IOException {
Copy link
Member

@dreamer-89 dreamer-89 Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Rishikesh1159 : Will be good to have test cases for failure scenario, multiple checkpoints and IT exercising this code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have already covered failure scenario in previous PR.For multiple checkpoints we are already passing 2 checkpoints in many of our tests, so not sure if there is need to multiple checkpoints test case, But if needed we can do that also. And for IT for this, yes it is good to have an IT for this code, but I think we can make a different PR for that

SegmentReplicationTargetService spy = spy(sut);
IndexShard spyShard = spy(indexShard);
ReplicationCheckpoint cp = indexShard.getLatestReplicationCheckpoint();
ReplicationCheckpoint newCheckpoint = new ReplicationCheckpoint(
cp.getShardId(),
cp.getPrimaryTerm(),
cp.getSegmentsGen(),
cp.getSeqNo(),
cp.getSegmentInfosVersion() + 1
);
ReplicationCheckpoint anotherNewCheckpoint = new ReplicationCheckpoint(
cp.getShardId(),
cp.getPrimaryTerm(),
cp.getSegmentsGen(),
cp.getSeqNo(),
cp.getSegmentInfosVersion() + 2
);
ArgumentCaptor<SegmentReplicationTargetService.SegmentReplicationListener> captor = ArgumentCaptor.forClass(
SegmentReplicationTargetService.SegmentReplicationListener.class
);
doNothing().when(spy).startReplication(any(), any(), any());
spy.onNewCheckpoint(newCheckpoint, spyShard);
spy.onNewCheckpoint(anotherNewCheckpoint, spyShard);
verify(spy, times(1)).startReplication(eq(newCheckpoint), any(), captor.capture());
verify(spy, times(1)).onNewCheckpoint(eq(anotherNewCheckpoint), any());
SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue();
listener.onDone(new SegmentReplicationState(new ReplicationLuceneIndex()));
verify(spy, times(2)).onNewCheckpoint(eq(anotherNewCheckpoint), any());
closeShard(indexShard, false);

}

public void testBeforeIndexShardClosed_CancelsOngoingReplications() {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
Expand Down