-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cancel recoveries even if all shards assigned #46520
Cancel recoveries even if all shards assigned #46520
Conversation
Hi @howardhuanghua, we have found your signature in our records, but it seems like you have signed with a different e-mail than the one used in your Git commit. Can you please add both of these e-mails into your Github profile (they can be hidden), so we can match your e-mails to your Github profile? |
Pinging @elastic/es-distributed |
Thanks for the suggestion @howardhuanghua. I think we need to understand the situation you're describing a little more clearly. If your indices are successfully synced-flushed then it shouldn't matter if some of them start to recover onto other nodes because those recoveries should quickly be cancelled by recoveries onto the restarted node. Are you saying this is not the case? Can you share logs from such a restart with Also can you supply tests that support your change? |
I think I see an issue: Lines 399 to 403 in 67e5ad2
We only consider cancelling ongoing recoveries if there are unassigned shards. I think this might explain why Elasticsearch spends time completing a recovery despite the synced-flushed shard on the restarted node. Is it the case that most of the recoveries are cancelled, but the last few run to completion? |
@DaveCTurner Yes, only few shards run to completion. lb_backend_server-300@1568044800000_7 2 r INITIALIZING 9.28.82.74 1527044744023702309 |
Thanks for confirming. In this case, I think it'd be better to contemplate cancelling recoveries even if there are no unallocated shards, rather than relying on the delayed allocation timeout as you propose. |
@DaveCTurner Thanks. I will continue to check the cancelling recoveries. |
Hi @DaveCTurner, we have sorted the slow recovery issue and provide new propose. Why some shards allocated to other nodes? Lines 399 to 403 in 67e5ad2
Above logic tries to handle unassigned shards where valid copies of the shard already exist. But following shard checking logic only considers syncId or sizeMatched between primary and replica. elasticsearch/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java Lines 369 to 380 in 67e5ad2
This would have follow issues:
Above issues would cause unassigned shard goes to next allocation step: Line 405 in 67e5ad2
Before allocation delay timeout, these unassigned shards would be allocated to other nodes. Why allocating cannot be cancelled? elasticsearch/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java Line 369 in 67e5ad2
This would cause cancelling logic cannot take effect: elasticsearch/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java Lines 110 to 112 in 67e5ad2
Allocating to new node cannot be cancelled cause restarting takes long time. Our propose: We have another propose to solve the issue, for the key point: elasticsearch/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java Lines 369 to 380 in 67e5ad2
There are several levels to check the unassigned shards should be relocated or not:
Please help to evaluate, if it's ok, we could provide patch. Thanks. |
You are right that we could use sequence numbers to make a better allocation decision in the case that there is no sync id too, but we are already working on this in #46318. |
Hi @DaveCTurner, we have checked sequence-number-based replica allocation. It could handle the first phase of rerouting unassigned shards. How do you think we still need to avoid allocating shards to other nodes before delay node left timeout in case of phase 1 has any issues? As the PR described in phase 2. |
I'm sorry I don't really understand the question. What is phase 2? |
Hi @DaveCTurner, in reroute method, the first phase tries to allocate unassigned shards to a node that already has a data copy: Lines 399 to 403 in 67e5ad2
Sequence-number-based replica allocation should be in phase 1 to check existing data copy, please correct me if something wrong. And phase 2 tries to allocate unassigned shards to a node as matched as possible, including new node (no data copy): Line 405 in 67e5ad2
Line 122 in 67e5ad2
So I provided the above PR to avoid allocating shards to other nodes before delay node left timeout in phase 2. |
Sorry @howardhuanghua I am perhaps misunderstanding the issue you are trying to fix with this PR. Can you add a test case that this change fixes? I think that would make things clearer. You might find it helpful to look at |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @howardhuanghua, I think this test is not realistic because of the skipAllocation
flag you've added to the DelayedShardsMockGatewayAllocator
. I have left a more detailed comment inline.
@@ -268,6 +271,9 @@ public void applyFailedShards(RoutingAllocation allocation, List<FailedShard> fa | |||
|
|||
@Override | |||
public void allocateUnassigned(RoutingAllocation allocation) { | |||
if (this.skipAllocation) { | |||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this addition. As far as I can tell this makes this mock allocator behave quite differently from the real GatewayAllocator
doesn't it? Your test only fails because of this difference in behaviour: if I comment this line out then your test passes without any changes to the production code. Can you provide a test case using the real allocator? I suggest adding to DelayedAllocationIT
rather than here to ensure the test matches the production code more closely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just want to simulate the case that if unassigned shard cannot be allocated in GatewayAllocator
, then it also needs to be delayed in ShardsAllocator
. Suppose we skip GatewayAllocator
allocation, without this PR, if I remove the node that has replica shard, unassigned shard will be allocated immediately to the other node, with this PR, it will still be delayed until delayed_timeout
.
Considering the sequence based allocation decision could select correct node for allocation, so this PR will have side effects on phase 2 allocation that will delay shard allocation until delayed_timeout. For more infomation please reference #46520 (comment).
Sorry @DaveCTurner, I didn't add comments in time after commiting the test case, let me try to explain our idea clearly. Let’s see we have a cluster with 3 nodes(A/B/C) and some indices, one of the indices named test has 1 primary shard and 1 replica shard. The test index shard layout is node A(shard p0), node B(shard r0), and node C. Set "index.unassigned.node_left.delayed_timeout" to 5 mins. The following steps create the scenario:
And next step, unassigned r0 will be handled in Line 122 in 67e5ad2
In this step, if node B gets throttled by other shards, r0 could be allocated to node C and cannot be cancelled, all the segment files need to be copied from p0. Our above PR is going to stop allocating r0 to node C before allocation delay timeout in phase 2. However, if allocation decision is based on sequence number (#46318), r0 would be allocated to node B in above phase 1 if p0 on node A still has complete history operations. If r0 cannot be allocated in phase 1, that means it must have no reusable data copy or complete history operations, we should relocate it to a new node immediately. In this case, our above PR will still wait until delayed_timeout, this may not be appropriate. So all in all, we plan to only fix the ongoing recoveries cannot be cancelled issue as you mentioned in #46520 (comment). If you think it's OK then we will provide the patch for this issue only. Please provide some advice if you have, thanks a lot. |
Thanks @howardhuanghua for your patient explanations. I think I now understand the issue more clearly, and I am more certain that it will be fixed by the seqno-based replica shard allocator that we're working on.
This is crucial to the issue you are seeing: the primary and replica must have absolutely nothing in common to hit this issue. If they share even a single segment (or a sync id) then we will hit the following code instead, and this will correctly throttle the allocation in the elasticsearch/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java Lines 202 to 216 in 49767fc
Once the
Yes, we'd very much appreciate a fix for that :) |
Hi @DaveCTurner, thanks for the response. I am glad that we are now on the same page :). I will provide the fix soon. |
Hi @DaveCTurner, I have updated the commit, added |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @howardhuanghua, can you also supply some tests for this change? We need at least an ESIntegTestCase
showing that it does cancel the last batch of recoveries. I would guess you could add this to org.elasticsearch.indices.recovery.IndexRecoveryIT
.
// now allocate all the unassigned to available nodes | ||
if (allocation.routingNodes().unassigned().size() > 0) { | ||
// now allocate all the unassigned to available nodes or cancel existing recoveries if we have a better match | ||
if (allocation.routingNodes().unassigned().size() > 0 || allocation.routingNodes().hasInactiveShards()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder: why not remove this condition entirely?
Thanks @DaveCTurner, I have updated the commit.
|
Hi @DaveCTurner, would you please help to check the updated commit again? Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @howardhuanghua. The test you provided failed when I ran it locally (see below for details). It's normally a good idea to run tests like this repeatedly since they are not fully deterministic and might not fail every time. That said, it looks like it's doing roughly the right things and I left some ideas for small improvements.
List<RecoveryState> nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates); | ||
assertThat(nodeARecoveryStates.size(), equalTo(1)); | ||
List<RecoveryState> nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); | ||
assertThat(nodeCRecoveryStates.size(), equalTo(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I ran this test it failed here:
2> REPRODUCE WITH: ./gradlew ':server:integTest' --tests "org.elasticsearch.indices.recovery.IndexRecoveryIT.testCancelNewShardRecoveryAndUsesExistingShardCopy {seed=[ECDF910E1F356F6D:FFC9E32BAD24745B]}" -Dtests.seed=ECDF910E1F356F6D -Dtests.security.manager=true -Dtests.jvms=4 -Dtests.locale=it -Dtests.timezone=America/Rio_Branco -Dcompiler.java=12 -Druntime.java=12
2> java.lang.AssertionError:
Expected: <1>
but: was <0>
at __randomizedtesting.SeedInfo.seed([ECDF910E1F356F6D:FFC9E32BAD24745B]:0)
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
at org.junit.Assert.assertThat(Assert.java:956)
at org.junit.Assert.assertThat(Assert.java:923)
assertBusy(() -> assertThat(client().admin().indices().prepareSyncedFlush(INDEX_NAME).get().failedShards(), equalTo(0))); | ||
|
||
logger.info("--> slowing down recoveries"); | ||
slowDownRecovery(shardSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slowDownRecovery
is for testing the throttling behaviour and is not sufficient here as there is still a chance that the recovery finishes before it is cancelled and this will cause the test to fail. I think we must completely halt the recovery until it has been cancelled. I would do this by either capture the START_RECOVERY
action (see testRecoverLocallyUpToGlobalCheckpoint
for instance) or one of the subsidiary requests (e.g. CLEAN_FILES
as done in testOngoingRecoveryAndMasterFailOver
).
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut()); | ||
|
||
// do sync flush to gen sync id | ||
assertBusy(() -> assertThat(client().admin().indices().prepareSyncedFlush(INDEX_NAME).get().failedShards(), equalTo(0))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is an assertBusy
necessary here? I think a failure of a synced flush is unexpected and should result in a test failure.
slowDownRecovery(shardSize); | ||
|
||
logger.info("--> stop node B"); | ||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeB)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to use internalCluster().restartNode()
which takes a RestartCallback
whose onNodeStopped
method is a good place to do things to the cluster while the node is stopped.
|
||
logger.info("--> request recoveries"); | ||
// peer recovery from nodeA to nodeC should be canceled, replica should be allocated to nodeB that has the data copy | ||
assertBusy(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we ensureGreen
here instead of this assertBusy
? I think all of the assertions in here should hold for sure once the cluster is green again.
Also could you merge the latest master, because there are now some conflicts that need resolving. |
703895a
to
dc25223
Compare
Thanks @DaveCTurner, I have updated the test case based on your suggestion. During restarting replica shard node, hold peer recovery from primary shard to new node, and check peer recovery source/target on replica shard node stopped, finally make sure cluster green before releasing the held peer recovery. Please help to review again. Thanks for your help! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @howardhuanghua I left a few more comments but this is looking very good.
new InternalTestCluster.RestartCallback() { | ||
@Override | ||
public Settings onNodeStopped(String nodeName) throws Exception { | ||
assertBusy(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😁 I was just about to note the missing wait here.
I think it'd be neater to wait for node A to send its CLEAN_FILES
action instead of using an assertBusy
. You can use another CountDownLatch
for this.
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); | ||
|
||
List<RecoveryState> recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); | ||
List<RecoveryState> nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do not need to say anything about the recoveries on node A. These assertions are true, but not particularly important for this test.
} | ||
}); | ||
|
||
// wait for peer recovering from nodeA to nodeB to be finished |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me some time to work out why this works - I suggest this comment explaining it:
// wait for peer recovering from nodeA to nodeB to be finished | |
// wait for peer recovery from nodeA to nodeB which is a no-op recovery so it skips the CLEAN_FILES stage and hence is not blocked |
server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java
Show resolved
Hide resolved
final String nodeA = internalCluster().startNode(); | ||
|
||
logger.info("--> create index on node: {}", nodeA); | ||
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shardSize
is unused:
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT) | |
createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats().getStore().size(); |
logger.info("--> start node B"); | ||
// force a shard recovery from nodeA to nodeB | ||
final String nodeB = internalCluster().startNode(); | ||
Settings nodeBDataPathSettings = internalCluster().dataPathSettings(nodeB); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nodeBDataPathSettings
is unused:
Settings nodeBDataPathSettings = internalCluster().dataPathSettings(nodeB); |
|
||
logger.info("--> start node C"); | ||
final String nodeC = internalCluster().startNode(); | ||
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd normally recommend the shorthand
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut()); | |
ensureStableCluster(3); |
but I don't think this is necessary:
startNode()
callsvalidateClusterFormed()
- anyway it doesn't matter if node C takes a bit longer to join the cluster because we have to wait for its recovery to start which only happens after it's joined.
Therefore I think we can drop this:
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut()); |
Hi @DaveCTurner, appreciate your patient help! I have updated the test case, please help to check again. |
@elasticmachine test this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM thanks @howardhuanghua.
We cancel ongoing peer recoveries if a node joins the cluster with a completely up-to-date copy of a shard, because we can use such a copy to recover a replica instantly. However, today we only look for recoveries to cancel while there are unassigned shards in the cluster. This means that we do not contemplate the cancellation of the last few recoveries since recovering shards are not unassigned. It might take much longer for these recoveries to complete than would be necessary if they were cancelled. This commit fixes this by checking for cancellable recoveries even if all shards are assigned.
Issue
Sometimes we need to perform rolling restart for some static configurations to take effect, or rolling restart to upgrade whole cluster. We have tested one of the cluster that has 6 nodes, total 10TB data, 6000+ shards with 1 replica. Before restarting, we have done sync flush, each node cost 10+ mins to get cluster GREEN, all nodes rolling restart cost more than 1 hour. And for 100+ nodes, need more than one day to upgrade.
After sorting related logic, we found an issue, take 3 nodes A, B, C as an example:
Test version: 5.6.4, 6.4.3, 7.3.1.
Related settings:
"index.unassigned.node_left.delayed_timeout": 3m
"cluster.routing.allocation.node_concurrent_recoveries": 30
"indices.recovery.max_bytes_per_sec": 40mb
One node (A) restart flow:
Solution
With this PR optimization, one node restarting time in above case could reduce from 10+ mins to around 1 min. The main logic:
After restarted node gets back, if it gets throttled, do not relocate unassigned shards to other nodes before delay allocation timeout. Then in most case would not cause segment files copy from remote node.