Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Controlling discovery for decommissioned nodes #4590

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
a015a81
Controlling discovery for decommissioned nodes
imRishN Sep 26, 2022
0a3f262
Fix spotless check
imRishN Sep 26, 2022
24fcaaf
Add changelog
imRishN Sep 26, 2022
c9e0f5b
Empty-Commit
imRishN Sep 26, 2022
bb0547d
Add basic UT
imRishN Sep 26, 2022
2ea4dc9
Add Consumer instead of listener
imRishN Sep 28, 2022
2582c4b
Remove UT
imRishN Sep 28, 2022
70b7d48
Refactor
imRishN Sep 28, 2022
a30b83d
Merge remote-tracking branch 'upstream/main' into decommission/contro…
imRishN Sep 28, 2022
da2fa40
Fix spotless check
imRishN Sep 28, 2022
61bb893
Improve logging msg
imRishN Sep 28, 2022
d872065
Fix spotless check
imRishN Sep 28, 2022
bee48b4
Add log msg in join helper
imRishN Sep 30, 2022
d02f9ae
Update peer finder interval to 2 min during decommission
imRishN Sep 30, 2022
6c0ce6c
Move flag to coordinator
imRishN Sep 30, 2022
01f2d70
Merge remote-tracking branch 'upstream/main' into decommission/contro…
imRishN Sep 30, 2022
8f82360
Merge remote-tracking branch 'upstream/main' into decommission/contro…
imRishN Oct 2, 2022
1c497b7
Add UT
imRishN Oct 3, 2022
e4d1354
Fix spotless check
imRishN Oct 3, 2022
1ac690d
Prevent join at join execute task and at coordinator. Add UTs
imRishN Oct 4, 2022
f448d78
Merge remote-tracking branch 'upstream/main' into decommission/contro…
imRishN Oct 4, 2022
adc4a8c
Move validator appropriately
imRishN Oct 5, 2022
69f2784
Merge remote-tracking branch 'upstream/main' into decommission/contro…
imRishN Oct 5, 2022
c42586b
Fix spotless check
imRishN Oct 5, 2022
a53e578
Make method pkg private
imRishN Oct 5, 2022
edff382
Ensure decommissioned node don't become leader
imRishN Oct 5, 2022
46c49b0
Add static helper for decommission flow
imRishN Oct 6, 2022
63f6c00
Updates in pre voting round
imRishN Oct 6, 2022
b8cf3fe
Move commission check
imRishN Oct 6, 2022
6a4c16f
Move commission check
imRishN Oct 6, 2022
50a6e67
Move helpers to Service
imRishN Oct 6, 2022
c9db6b1
Fix executor
imRishN Oct 6, 2022
bb6f573
Remove UT
imRishN Oct 6, 2022
8dd4457
Fix spotless check
imRishN Oct 6, 2022
19c524d
Minor
imRishN Oct 6, 2022
c5b1c0d
Merge remote-tracking branch 'upstream/main' into decommission/contro…
imRishN Oct 6, 2022
41cc099
Add built in join validator
imRishN Oct 6, 2022
260641a
Fix
imRishN Oct 6, 2022
8e161a6
Add UT for join
imRishN Oct 7, 2022
4a932b7
Merge remote-tracking branch 'upstream/main' into decommission/contro…
imRishN Oct 7, 2022
05d073e
Fix spotless check
imRishN Oct 7, 2022
3b6c1b7
Changes in coordinator
imRishN Oct 7, 2022
8b94b8d
Add UT for coordinator
imRishN Oct 7, 2022
165b961
Fix spotless check
imRishN Oct 7, 2022
e4e1b5c
Add test for execute method of task
imRishN Oct 7, 2022
0b3f106
Empty-Commit
imRishN Oct 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318))
- Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580))
- Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084))
- Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590))


### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,19 +208,6 @@ public Coordinator(
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.singleNodeDiscovery = DiscoveryModule.isSingleNodeDiscovery(settings);
this.electionStrategy = electionStrategy;
this.joinHelper = new JoinHelper(
settings,
allocationService,
clusterManagerService,
transportService,
this::getCurrentTerm,
this::getStateForClusterManagerService,
this::handleJoinRequest,
this::joinLeaderInTerm,
this.onJoinValidators,
rerouteService,
nodeHealthService
);
this.persistedStateSupplier = persistedStateSupplier;
this.noClusterManagerBlockService = new NoClusterManagerBlockService(settings, clusterSettings);
this.lastKnownLeader = Optional.empty();
Expand All @@ -244,6 +231,20 @@ public Coordinator(
new HandshakingTransportAddressConnector(settings, transportService),
configuredHostsResolver
);
this.joinHelper = new JoinHelper(
settings,
allocationService,
clusterManagerService,
transportService,
this::getCurrentTerm,
this::getStateForClusterManagerService,
this::handleJoinRequest,
this::joinLeaderInTerm,
this.onJoinValidators,
rerouteService,
nodeHealthService,
peerFinder.nodeCommissionedListener()
);
this.publicationHandler = new PublicationTransportHandler(
transportService,
namedWriteableRegistry,
Expand Down Expand Up @@ -1451,6 +1452,11 @@ public void run() {
return;
}

if (peerFinder.localNodeDecommissioned()) {
logger.debug("skip prevoting as local node is decommissioned");
return;
}

if (prevotingRound != null) {
prevotingRound.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.ClusterStateTaskListener;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.coordination.Coordinator.Mode;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RerouteService;
Expand All @@ -57,6 +58,7 @@
import org.opensearch.monitor.StatusInfo;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportRequest;
Expand Down Expand Up @@ -112,6 +114,7 @@ public class JoinHelper {

private final TimeValue joinTimeout; // only used for Zen1 joining
private final NodeHealthService nodeHealthService;
private final ActionListener<Void> nodeCommissionedListener;

private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());

Expand All @@ -130,12 +133,14 @@ public class JoinHelper {
Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators,
RerouteService rerouteService,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
ActionListener<Void> nodeCommissionedListener
) {
this.clusterManagerService = clusterManagerService;
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.nodeCommissionedListener = nodeCommissionedListener;
this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService, transportService) {

private final long term = currentTermSupplier.getAsLong();
Expand Down Expand Up @@ -342,6 +347,7 @@ public void handleResponse(Empty response) {
pendingOutgoingJoins.remove(dedupKey);
logger.debug("successfully joined {} with {}", destination, joinRequest);
lastFailedJoinAttempt.set(null);
nodeCommissionedListener.onResponse(null);
onCompletion.run();
}

Expand All @@ -352,6 +358,10 @@ public void handleException(TransportException exp) {
FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp);
attempt.logNow();
lastFailedJoinAttempt.set(attempt);
if (exp instanceof RemoteTransportException && (exp.getCause() instanceof NodeDecommissionedException)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current flow is:

  1. Node sends the JoinRequest to active leader
  2. Active leader sends the ValidateJoinRequest
  3. Node checks for decommissioning and will fail the request if node has decommissioning attribute

The above logic will not work if the current master eligible node is not active leader and is a candidate accumulating joins Code Ref

 if (stateForJoinValidation.nodes().isLocalNodeElectedClusterManager()) {
    .....
   sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
 } else {
    processJoinRequest(joinRequest, joinCallback);
   }

Suggestion:
Ideally the decommissioning check should be executed at

  1. During handleJoinRequest in the Coordinator. This is lightweight check DiscoveryNode would be sending attribute information during join. This will reject any joins from decommissioned node early on itself.
  2. During JoinExecutor submittask as well as that will ensure correctness. It will never allow a node with decommission attribute to join the cluster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, can you please help me understand that when current leader eligible node is not active leader and is just accumulating joins, we just skip the validator code flow?

Copy link
Member Author

@imRishN imRishN Oct 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above logic will not work if the current master eligible node is not active leader and is a candidate accumulating joins Code Ref

Thanks for pointing this out @shwetathareja. As per your suggestions, implementing the changes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made these changes.

  1. A check in Coordinator#handleJoinRequest (when the state doesn't have STATE_NOT_RECOVERED_BLOCK)
  2. Check in the execute method of JoinTask
  3. Removed it from builtin validators

@shwetathareja , lmk if this looks ok

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I explored more on join validators -

During handleJoinRequest in the Coordinator. This is lightweight check DiscoveryNode would be sending attribute information during join. This will reject any joins from decommissioned node early on itself.

I think we can keep it as built in join validators, and Coordinator#handleJoinRequest validates all the checks here. So handle join request would be covered here. Another case I see where keeping it at BuiltinValidator would help is during handlePublishRequest. Ideally we would never hit this case as it runs only on Mode.Leader but I think for any such cases in future, it would be good to have all the validators at one place

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Built validator delays the check which can be checked early on during handleJoinRequest itself. Can you help me understand the handlePublishRequest scenario?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transportService.connectToNode(joinRequest.getSourceNode(), ActionListener.wrap(ignore -> {
            final ClusterState stateForJoinValidation = getStateForClusterManagerService();

            if (stateForJoinValidation.nodes().isLocalNodeElectedClusterManager()) {
                onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
                if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
                    // we do this in a couple of places including the cluster update thread. This one here is really just best effort
                    // to ensure we fail as fast as possible.
                    JoinTaskExecutor.ensureMajorVersionBarrier(
                        joinRequest.getSourceNode().getVersion(),
                        stateForJoinValidation.getNodes().getMinNodeVersion()
                    );
                    // we are checking source node commission status here to reject any join request coming from a decommissioned node
                    // even before executing the join task to fail fast
                    JoinTaskExecutor.ensureNodeCommissioned(joinRequest.getSourceNode(), stateForJoinValidation.metadata());
                }
                sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
            } else {
                processJoinRequest(joinRequest, joinCallback);
            }
        }, joinCallback::onFailure));

This is the snippet from handleJoinRequest of Coordinator. If you see, this method is also running the same built in validators onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));. And hence the concern with early checking is actually already resolved here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might never actually hit the handlePublishRequest scenario as this would be run only for Leader mode. But what I was trying to say is plugins put their join validators in onJoinValidators and then Coordinator adds all other validators to this onJoinValidators. And then in handlePublishRequest and handleJoinRequest we validate with whatever validators onJoinValidators has. Hence, keeping all the validators centrally in onJoinValidators might be good

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handlePublishRequest is executed on the node joining the master. But with the check inside handleJoinRequest master will reject the joins upfront.

Copy link
Member Author

@imRishN imRishN Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I meant the built in validators are also executed in handleJoinRequest

logger.info("local node is decommissioned. Will not be able to join the cluster");
imRishN marked this conversation as resolved.
Show resolved Hide resolved
nodeCommissionedListener.onFailure(exp);
}
onCompletion.run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ public void apply(Settings value, Settings current, Settings previous) {
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING,
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING,
PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING,
ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING,
ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING,
Expand Down
35 changes: 34 additions & 1 deletion server/src/main/java/org/opensearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,23 @@ public abstract class PeerFinder {
Setting.Property.NodeScope
);

// the time between attempts to find all peers when node is in decommissioned state, default set to 3 minutes
public static final Setting<TimeValue> DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING = Setting.timeSetting(
"discovery.find_peers_interval_during_decommission",
TimeValue.timeValueMinutes(3L),
imRishN marked this conversation as resolved.
Show resolved Hide resolved
TimeValue.timeValueMillis(1000),
Setting.Property.NodeScope
);

public static final Setting<TimeValue> DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING = Setting.timeSetting(
"discovery.request_peers_timeout",
TimeValue.timeValueMillis(3000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
);

private final TimeValue findPeersInterval;
private final Settings settings;
private TimeValue findPeersInterval;
private final TimeValue requestPeersTimeout;

private final Object mutex = new Object();
Expand All @@ -101,6 +110,7 @@ public abstract class PeerFinder {

private volatile long currentTerm;
private boolean active;
private boolean localNodeDecommissioned = false;
private DiscoveryNodes lastAcceptedNodes;
private final Map<TransportAddress, Peer> peersByAddress = new LinkedHashMap<>();
private Optional<DiscoveryNode> leader = Optional.empty();
Expand All @@ -112,6 +122,7 @@ public PeerFinder(
TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver
) {
this.settings = settings;
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);
this.transportService = transportService;
Expand All @@ -128,6 +139,28 @@ public PeerFinder(
);
}

public ActionListener<Void> nodeCommissionedListener() {
return new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
localNodeDecommissioned = false;
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
logger.info("updated findPeersInterval to [{}] as node is commissioned", findPeersInterval);
}

@Override
public void onFailure(Exception e) {
localNodeDecommissioned = true;
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING.get(settings);
logger.info("updated findPeersInterval to [{}] as node is decommissioned", findPeersInterval);
}
};
}

public boolean localNodeDecommissioned() {
return localNodeDecommissioned;
}

imRishN marked this conversation as resolved.
Show resolved Hide resolved
public void activate(final DiscoveryNodes lastAcceptedNodes) {
logger.trace("activating with {}", lastAcceptedNodes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.logging.log4j.Level;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterName;
Expand All @@ -55,6 +56,7 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.Mockito.mock;
import static org.opensearch.monitor.StatusInfo.Status.HEALTHY;
import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY;
import static org.opensearch.node.Node.NODE_NAME_SETTING;
Expand Down Expand Up @@ -90,7 +92,8 @@ public void testJoinDeduplication() {
startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(),
(s, p, r) -> {},
() -> new StatusInfo(HEALTHY, "info")
() -> new StatusInfo(HEALTHY, "info"),
mock(ActionListener.class)
);
transportService.start();

Expand Down Expand Up @@ -230,7 +233,8 @@ private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName,
startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(),
(s, p, r) -> {},
null
null,
mock(ActionListener.class)
); // registers request handler
transportService.start();
transportService.acceptIncomingRequests();
Expand Down Expand Up @@ -284,7 +288,8 @@ public void testJoinFailureOnUnhealthyNodes() {
startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(),
(s, p, r) -> {},
() -> nodeHealthServiceStatus.get()
() -> nodeHealthServiceStatus.get(),
mock(ActionListener.class)
);
transportService.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,14 @@ public void testReconnectsToDisconnectedNodes() {
assertFoundPeers(rebootedOtherNode);
}

public void testNodeCommissioning() {
peerFinder.nodeCommissionedListener().onFailure(new Exception("unit-test"));
assertTrue(peerFinder.localNodeDecommissioned());

peerFinder.nodeCommissionedListener().onResponse(null);
assertFalse(peerFinder.localNodeDecommissioned());
}

private void respondToRequests(Function<DiscoveryNode, PeersResponse> responseFactory) {
final CapturedRequest[] capturedRequests = capturingTransport.getCapturedRequestsAndClear();
for (final CapturedRequest capturedRequest : capturedRequests) {
Expand Down