Skip to content

Commit

Permalink
NIFI-13649 Ensure cluster node API address is reachable before markin…
Browse files Browse the repository at this point in the history
…g CONNECTED
  • Loading branch information
bbende committed Aug 9, 2024
1 parent 920edfc commit d0c8a9d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {

private static final Duration NODE_API_TIMEOUT = Duration.ofSeconds(10);

private final int heartbeatIntervalMillis;
private final int missableHeartbeatCount;
private static final Logger logger = LoggerFactory.getLogger(AbstractHeartbeatMonitor.class);
Expand Down Expand Up @@ -288,6 +293,11 @@ private void processHeartbeat(final NodeHeartbeat heartbeat) {
return;
}

if (!isNodeApiReachable(nodeId)) {
logger.info("Received a connection request from {}, but the node's API address is not reachable, will not connect node to the cluster yet", nodeId);
return;
}

// connection complete
clusterCoordinator.finishNodeConnection(nodeId);
clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected.");
Expand All @@ -296,6 +306,16 @@ private void processHeartbeat(final NodeHeartbeat heartbeat) {
clusterCoordinator.validateHeartbeat(heartbeat);
}

private boolean isNodeApiReachable(final NodeIdentifier nodeIdentifier) {
try {
final InetAddress nodeApiAddress = InetAddress.getByName(nodeIdentifier.getApiAddress());
return nodeApiAddress.isReachable((int) NODE_API_TIMEOUT.toMillis());
} catch (final Exception e) {
logger.debug("Node is not reachable at API address {}", nodeIdentifier.getApiAddress(), e);
return false;
}
}

/**
* @return the most recent heartbeat information for each node in the
* cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ public void run() {
case CLUSTER_STATUS:
logger.info("Received CLUSTER_STATUS request from Bootstrap");
final String clusterStatus = getClusterStatus();
logger.debug("Responding to CLUSTER_STATUS request from Bootstrap with {}", clusterStatus);
sendAnswer(socket.getOutputStream(), clusterStatus);
break;
case DECOMMISSION:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
Expand Down Expand Up @@ -398,8 +400,8 @@ boolean isReplicateRequest() {

ensureFlowInitialized();

// If not connected to the cluster, we do not replicate
if (!isConnectedToCluster()) {
// If not connected or not connecting to the cluster, we do not replicate
if (!isConnectedToCluster() && !isConnectingToCluster()) {
return false;
}

Expand Down Expand Up @@ -1054,6 +1056,16 @@ boolean isConnectedToCluster() {
return isClustered() && clusterCoordinator.isConnected();
}

boolean isConnectingToCluster() {
if (!isClustered()) {
return false;
}

final NodeIdentifier nodeId = clusterCoordinator.getLocalNodeIdentifier();
final NodeConnectionStatus nodeConnectionStatus = clusterCoordinator.getConnectionStatus(nodeId);
return nodeConnectionStatus != null && nodeConnectionStatus.getState() == NodeConnectionState.CONNECTING;
}

boolean isClustered() {
return clusterCoordinator != null;
}
Expand Down

0 comments on commit d0c8a9d

Please sign in to comment.