Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Segment replication stats APIs to support pull based architecture #15534

Closed
Tracked by #15306
mch2 opened this issue Aug 30, 2024 · 8 comments · Fixed by #16678
Closed
Tracked by #15306

Update Segment replication stats APIs to support pull based architecture #15534

mch2 opened this issue Aug 30, 2024 · 8 comments · Fixed by #16678
Assignees
Labels
v2.19.0 Issues and PRs related to version 2.19.0

Comments

@mch2
Copy link
Member

mch2 commented Aug 30, 2024

With #4577 replicas will sync directly with their source of replication rather than pushing updates from the primary.

Today stats are collected at the primary level to support APIs and enforce backpressure. With the rw split we will ensure primaries do not have any direct communication with replicas and will not be able to collect these stats.

To fix this we can update our stats APIs to fetch the latest checkpoint from each replica directly and compute the required stats at the coordinator, eliminating the need for primaries to capture these stats.

@mch2 mch2 changed the title Update Segment replication stats APIs and backpressure. Update Segment replication stats APIs. Aug 30, 2024
@mch2 mch2 changed the title Update Segment replication stats APIs. Update Segment replication stats APIs to support pull based architecture Aug 30, 2024
@mch2 mch2 added v2.18.0 Issues and PRs related to version 2.18.0 and removed untriaged labels Sep 16, 2024
@vinaykpud
Copy link
Contributor

Hi @mch2 can I take this task?

@getsaurabh02 getsaurabh02 moved this from Todo to In Progress in Performance Roadmap Oct 7, 2024
@mch2 mch2 added v2.19.0 Issues and PRs related to version 2.19.0 and removed v2.18.0 Issues and PRs related to version 2.18.0 labels Nov 4, 2024
@vinaykpud
Copy link
Contributor

vinaykpud commented Nov 13, 2024

Documentation for the current CAT SegRep Stats API:
https://opensearch.org/docs/latest/api-reference/cat/cat-segment-replication/

I tried to explain how the this API work currently bellow:

Currently each replica once the replication is complete for the latest received checkpoint it updates the visible checkpoint to the primary shard. Primary keeps track of all the checkpoints replicated at the replicas. Whenever stats is requested primary calculates the stats using the tracked replica checkpoints. The algorithms for this API work like bellow:

Steps:

  1. Request received at the coordinator it will broadcast the request to all the nodes.
  2. Request will be executed in all the shards in the nodes.
    1. If its primary node it builds and returns the SegmentReplicationPerGroupStats. As part of this it computes the stats of all of its replica ie SegmentReplicationShardStats per replica.
    2. If its Replica node it returns the SegmentReplicationState
  3. Response is collected at the coordinator. SegmentReplicationState received from the replica shards used to
    enrich the SegmentReplicationShardStats calculated at the primary.
  4. Coordinator returns the combined response

How the replication works for the regular replica?

Primary Shard publishes the the checkpoint. All the replica shards will receive the checkpoint. Once replica receives the checkpoint it starts replication process for that checkpoint. Once the replication is complete replica updates latestVisibleCheckpoint the primary.

How the replication works for the search replica?

For search replicas, it runs an async process continuously for every interval ie default 1s. Job of the async process is to startReplication from the SegmentReplicationSource as Remote store. So it will fetch the checkPoint from the remote store and perform the replication.

Impact of the Search replica to stats calculation:

With the addition of the search replica, we cannot continue to calculate the SegRep stats how we do now. Because with the reader/writer split we ensure that primaries do not have any direct communication with replicas ie Primary do not aware of the Search replica checkpoints. So we have to calculate the stats for the search replica differently.

Lets now check different metrics we return as part of the response:

CheckPointBehindCount: Number of checkpoints by which replica is behind the primary shard.

BytesBehindCount: Number of bytes by which replica is behind the primary shard.

CurrentReplicationLag: The time elapsed while waiting for a replica shard to catch up to the primary shard.

LastCompletedReplicationLag: The time taken for a replica shard to catch up to the latest primary shard refresh.

RejectedRequests: The number of rejected requests for the replication group.

Lets now check how the SegRep Stats metric calculation logic works now

Currently the calculation happens in ReplicationTracker, This requires three different important data,

  1. VisibleCheckPoint at the shard
    1. Checkpoint which is shard has most recently computed
  2. LatestReplicationCheckpoint
    1. Latest checkpoint at the primary shard
  3. SegmentReplicationLagTimer
    1. It has two different values. CreationTime and StartTime.
    2. Timer created when the checkpoint created/refreshed at the primary and started when the checkpoint is published
  • CheckPointBehindCount is calculated based on the active checkpoint timers
  • VisibleCheckPoint and LatestReplicationCheckpoint are used to calculate the BytesBehindCount which says how far behind the replica from primary.
  • CurrentReplicationLag is calculated based on the SegmentReplicationLagTimer
  • LastCompletedReplicationLag is the last completed replication time tracked by ReplicationTracker
  • RejectedRequests is the metric which is from pressure service

Currently the diff is between latestReplicationCheckpoint and visibleReplicationCheckpoint calculated in the primary for every CheckpointState, ie for every replica shards.

What is latestReplicationCheckpoint?:

IndexShard has two Listeners for the refresh

  1. ReplicationCheckPointUpdater
    1. Sets the latestReplicationCheckpoint to the primaryShard after refresh
  2. CheckpointRefreshListener (Needed only for local segrep)
    1. Publishes the latestReplicationCheckpoint

What is visibleReplicationCheckpoint:

After a replication is completed in the replica, it calls updateVisibleCheckpoint
It actually does a transport call.

UpdateVisibleCheckpointRequestHandler is registered with transportService in SegmentReplicationSourceService
It updates the visibleCheckPoint for a replica shard in the primary shard.

Segment Replication in in Regular replica and Search replica:

Segment Replication in regular replica works based on the checkpoint received from the primary where as search replica its actually pulled from the remote store.

SegmentReplicator has two startReplication methods

  1. startReplication with source
    1. this is called when the replication is invoked based on the checkpoint received in the regular replica from the primary
  2. startReplication without source
    1. this is called when the replication is invoked as a regular interval in the Node (In case of Search replica where the replication is from the remote store)

Thanks @mch2 to helping me onboard to this issue.

As above section provides some detailed background on this issue, In the following comment I will add the proposed solution details

@vinaykpud
Copy link
Contributor

vinaykpud commented Nov 13, 2024

Solution 1: No change for the existing calculation, Stats for Search Replica calculated differently

We will keep the current way of calculation for the regular replicas, but start supporting the search replicas also in the stats.

This can be achieved by calculating and returning the stats from the search replica shard and using that in the coordinator to enrich the overall stats to include the search replica stats as well.

Pros:

  • No changes in the existing way of calculation for regular replica

Cons:

  • Stats calculated for search replica doest compare it with the checkpoint in primary shard.
  • Stats calculated differently for the search replica, so this will change the definition or meaning of the metrics for search replica.
  • Since we present the data of regular replica and search replica together in the response user may get confused with the difference in the meaning of this metrics

Deep dive on implementation:

We need to calculate the SegmentReplicationShardStats for the search replica shard individually.

So for CheckPointBehindCount and BytesBehindCount calculation we can use latestReplicationCheckpoint at the shard and latestReplicationCheckpoint consumed by the replication process. This will actually give the replication lag with respective to the latest received checkpoint.

For CurrentReplicationLag and LastCompletedReplicationLag calculation we can use the timer available in the SegmentReplicationState

In the coordinator node, we can use the SegmentReplicationShardStats from the search replica to enrich the existing stats thats computed from regular replicas.

@vinaykpud
Copy link
Contributor

vinaykpud commented Nov 13, 2024

Solution 2: Calculate the Stats for each replica at replica Shard level

We can calculate the stats at the replicas and return those stats to coordinator. Coordinator will combine all the stats received from the replicas.

Pros:

  • We can unify the metrics meaning and definition irrespective of the replica types.

Cons:

  • This will change the overall definition of the stats published as part of this which is a major change for the API.
  • We wont be comparing the checkpoints between the primary and replica’s anymore. So we wont know the answer for “how far the replica behind the primary”

Deep dive on implementation:

We need to calculate the SegmentReplicationShardStats for the search replica shard and regular replica shard individually without referencing the primaryShard.

So for CheckPointBehindCount and BytesBehindCount calculation we can use latestReplicationCheckpoint at the shard and latestReplicationCheckpoint consumed by the replication process. This will actually give the replication lag with respective to the latest received checkpoint.

For CurrentReplicationLag and LastCompletedReplicationLag calculation we can use the timer available in the SegmentReplicationState

Once every replica shard returns the SegmentReplicationShardStats, this can be combined as a Set in the coordinator node.

SegmentReplicationPerGroupStats for the index can be created in the coordinator node using the replica shard stats.

rejectionCount cannot be computed as it comes from the pressure service or we can return this data from the primary shard and use this in the coordinator.

rejectionCount is not applicable for the search replicas.

Proposed Metrics definition :

CheckPointBehindCount: Number of checkpoints by which replica is behind with respective to the latest received checkpoint in the replica.

BytesBehindCount: Number of bytes by which replica is behind with respective to the latest received checkpoint in the replica.

CurrentReplicationLag: Total time elapsed for the replica shard to perform the current segment replication.

LastCompletedReplicationLag: Total time elapsed for replica shard to complete the last replication.

@vinaykpud
Copy link
Contributor

vinaykpud commented Nov 14, 2024

Solution 3: Return CheckPoint from replica shards, Coordinator will compute the stats

We can fetch the latest visible checkpoints from each replica and compute the required stats at the coordinator.

We need to compare the latestReplicationCheckpoint from primary against the checkpoints received from each replica to build the metrics.

Pros:

  • We still be comparing the replicas against the primary to get the metrics, which makes us in sync with the current definition of the API. We can know “how far the replica behind the primary”.

Cons:

  • We will start using the SegmentReplicationState from the replicas to calculate the lag times, which will change the existing way of calculation.

Deep dive on implementation:

Every replica shard can keep track of the latestVisibleCheckPoint based on the segment replication completion.
Every replica shard will return its latestVisibleCheckPoint and SegmentReplicationState.

In the coordinator node, we have latestCheckPoint from the primary and latestVisibleCheckPoint from other shards.
Using this we can calculate set of SegmentReplicationShardStats for the replicas.
Use the SegmentReplicationState to enrich the SegmentReplicationShardStats of replica

SegmentReplicationState has overallTimer which is time taken for a single replication. overallTimer.time() will give the elapsed time for the replication. Similarly this will get the lastReplicationLag from the state using overallTimer.

Build SegmentReplicationPerGroupStats at the coordinator:

  1. ShardId
  2. Set of SegmentReplicationShardStats for the replicas
  3. rejectionCount comes from the pressure service, so we may need to return this from the primary shard to use it here.

Return the overall response from the coordinator.

Proposed Metrics definition :

CheckPointBehindCount: Number of checkpoints by which replica is behind the primary shard.

BytesBehindCount: Number of bytes by which replica is behind the primary shard.

CurrentReplicationLag: Total time elapsed for the replica shard to perform the current segment replication.

LastCompletedReplicationLag: Total time elapsed for replica shard to complete the last replication.

@vinaykpud
Copy link
Contributor

So in summary:

Choosing between this options really depends on what are metrics we really care and how we want to interpret the meaning of metrics.

  • Solution 2 and Solution 3 unifies the meaning of the metrics between search and regular replica where as Solution 1 adds difference in the meaning depending on the replica type.
  • Solution 3 provides a way to answer “how far the replica behind the primary”, where as Solution 2 takes away that and the calculates the stats without referring the primary shard checkpoint.

@mch2
Copy link
Member Author

mch2 commented Nov 18, 2024

Thanks @vinaykpud for laying this out, which one do you think is the right approach?

We shouldn't be changing the meaning of returned metrics within a minor release, even if it is a cat API. So I'm leaning towards lets do 1 with the introduction of the feature, and 2 or 3 for the next major?

There are really three things at play here that all depend on these primary collected stats. The cat segrep API, segrep stats returned via node stats, and the segrep backpressure mechanism. I think that we tried to get too precise with the definition of replication lag with the implementation of segrep and it has left us a lot of unnecessary complexity. So I would be in favor of dramatically simplifying all three by computing replication lag on the fly rather than pre computing it OR doing away with primary comparison entirely and only showing stats on ongoing syncs... I think that would mean:

Cat segrep - Show point in time view of download stats for each replica. Similar to _cat/recovery in what it returns and does not provide any primary comparison. These stats are already there with detailed as true, we could make this the default and deprecate the existing. We could optionally provide the primary comparison here at a shard level but I'm not sure the value is there and makes the API expensive for large clusters.

Node stats - show aggregate stats about the ongoing replication events running on a node, answers the question how long is it taking to sync & how many bytes. Computed local to each shard based only on ongoing replication events, does not give precise replication lag stats based on our current definition. Intended to be used to identify problematic nodes or hot spots.

Segrep Backpressure - From what I have seen in practice the regular indexing and remote store backpressure mechanisms are preferred to the segrep pressure as they are less aggressive and still serve the same purpose. So I would be in favor of deprecating/removing this. We can still provide shard failure mechanisms that can fail shards local to a node based on replication events that take too long or optionally include primary comparison at a replication group.

@vinaykpud
Copy link
Contributor

Thanks @mch2. I think selection of approach depends on if we need to compare with primary or we just return the on going replication stats. Based on the discussions it looks like we are inclined towards later. ie Solution 2.

Since this associates with definition change of existing cat segrep API We can implement & refactor incrementally. So we can do Solution 1 and then move to Solution 2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
v2.19.0 Issues and PRs related to version 2.19.0
Projects
Status: Done
2 participants