-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Controlling discovery for decommissioned nodes #4590
Changes from 19 commits
a015a81
0a3f262
24fcaaf
c9e0f5b
bb0547d
2ea4dc9
2582c4b
70b7d48
a30b83d
da2fa40
61bb893
d872065
bee48b4
d02f9ae
6c0ce6c
01f2d70
8f82360
1c497b7
e4d1354
1ac690d
f448d78
adc4a8c
69f2784
c42586b
a53e578
edff382
46c49b0
63f6c00
b8cf3fe
6a4c16f
50a6e67
c9db6b1
bb6f573
8dd4457
19c524d
c5b1c0d
41cc099
260641a
8e161a6
4a932b7
05d073e
3b6c1b7
8b94b8d
165b961
e4e1b5c
0b3f106
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,7 @@ | |
import org.opensearch.cluster.ClusterStateTaskListener; | ||
import org.opensearch.cluster.NotClusterManagerException; | ||
import org.opensearch.cluster.coordination.Coordinator.Mode; | ||
import org.opensearch.cluster.decommission.NodeDecommissionedException; | ||
import org.opensearch.cluster.metadata.Metadata; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.cluster.routing.RerouteService; | ||
|
@@ -57,6 +58,7 @@ | |
import org.opensearch.monitor.StatusInfo; | ||
import org.opensearch.threadpool.ThreadPool; | ||
import org.opensearch.threadpool.ThreadPool.Names; | ||
import org.opensearch.transport.RemoteTransportException; | ||
import org.opensearch.transport.TransportChannel; | ||
import org.opensearch.transport.TransportException; | ||
import org.opensearch.transport.TransportRequest; | ||
|
@@ -78,6 +80,7 @@ | |
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.Consumer; | ||
import java.util.function.Function; | ||
import java.util.function.LongSupplier; | ||
import java.util.function.Supplier; | ||
|
@@ -118,6 +121,7 @@ public class JoinHelper { | |
private final AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>(); | ||
|
||
private final Supplier<JoinTaskExecutor> joinTaskExecutorGenerator; | ||
private final Consumer<Boolean> nodeCommissioned; | ||
|
||
JoinHelper( | ||
Settings settings, | ||
|
@@ -130,12 +134,14 @@ public class JoinHelper { | |
Function<StartJoinRequest, Join> joinLeaderInTerm, | ||
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, | ||
RerouteService rerouteService, | ||
NodeHealthService nodeHealthService | ||
NodeHealthService nodeHealthService, | ||
Consumer<Boolean> nodeCommissioned | ||
) { | ||
this.clusterManagerService = clusterManagerService; | ||
this.transportService = transportService; | ||
this.nodeHealthService = nodeHealthService; | ||
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); | ||
this.nodeCommissioned = nodeCommissioned; | ||
this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService, transportService) { | ||
|
||
private final long term = currentTermSupplier.getAsLong(); | ||
|
@@ -342,6 +348,7 @@ public void handleResponse(Empty response) { | |
pendingOutgoingJoins.remove(dedupKey); | ||
logger.debug("successfully joined {} with {}", destination, joinRequest); | ||
lastFailedJoinAttempt.set(null); | ||
nodeCommissioned.accept(true); | ||
imRishN marked this conversation as resolved.
Show resolved
Hide resolved
|
||
onCompletion.run(); | ||
} | ||
|
||
|
@@ -352,6 +359,13 @@ public void handleException(TransportException exp) { | |
FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp); | ||
attempt.logNow(); | ||
lastFailedJoinAttempt.set(attempt); | ||
if (exp instanceof RemoteTransportException && (exp.getCause() instanceof NodeDecommissionedException)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current flow is:
The above logic will not work if the current master eligible node is not active leader and is a candidate accumulating joins Code Ref
Suggestion:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In that case, can you please help me understand that when current leader eligible node is not active leader and is just accumulating joins, we just skip the validator code flow? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out @shwetathareja. As per your suggestions, implementing the changes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made these changes.
@shwetathareja , lmk if this looks ok There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey, I explored more on join validators -
I think we can keep it as built in join validators, and Coordinator#handleJoinRequest validates all the checks here. So handle join request would be covered here. Another case I see where keeping it at BuiltinValidator would help is during handlePublishRequest. Ideally we would never hit this case as it runs only on Mode.Leader but I think for any such cases in future, it would be good to have all the validators at one place There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Built validator delays the check which can be checked early on during handleJoinRequest itself. Can you help me understand the handlePublishRequest scenario? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is the snippet from handleJoinRequest of Coordinator. If you see, this method is also running the same built in validators There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might never actually hit the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. handlePublishRequest is executed on the node joining the master. But with the check inside There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. I meant the built in validators are also executed in handleJoinRequest |
||
logger.info( | ||
"local node is decommissioned [{}]. Will not be able to join the cluster", | ||
exp.getCause().getMessage() | ||
); | ||
nodeCommissioned.accept(false); | ||
} | ||
onCompletion.run(); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every time the OpenSearch process is bounced,
localNodeCommissioned
will get reset to true. Now suppose, there is a decommissioned master-eligible node in the cluster and also at that point there is no active leader in the cluster (temporary, no quorum loss). Then this node can become candidate and will collect votes from others and become master. It may end up abdicating later ( we need to cross check that flow again) but it would cause un-necessary churn in the cluster. To mitigate this, JoinExecutor should also ensure node which is going to become active leader is also not decommissioned.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made changes for this. LMK if you think there could be a better way to this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets do the check here in prevoting round itself using lastAcceptedState. There is no need for localNodeCommissioned variable check. Also, there is no need for decommission check in JoinTaskExecutor for leader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a stale lastAccepted State or none for a newly joining master-eligible node
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lastAcceptedState
is the state last written on its disk. If its last state thinks it was decommissioned we can let it stay out of pre voting and let other master eligible node form the cluster. In its ping request, it would be able to join the cluster if it was not decommissionedThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also,
lastAcceptedState
is also considered as source of truth to decide whether it can win election or not (based on configuration and exclusion)