-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
avoid repeat connections in pings every round #21812
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
import org.elasticsearch.cluster.node.DiscoveryNodes; | ||
import org.elasticsearch.common.Nullable; | ||
import org.elasticsearch.common.UUIDs; | ||
import org.elasticsearch.common.collect.Tuple; | ||
import org.elasticsearch.common.component.AbstractComponent; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
|
@@ -292,24 +293,13 @@ Collection<PingResponse> pingAndWait(TimeValue duration) { | |
*/ | ||
@Override | ||
public void ping(final PingListener listener, final TimeValue duration) { | ||
final List<DiscoveryNode> resolvedDiscoveryNodes; | ||
try { | ||
resolvedDiscoveryNodes = resolveDiscoveryNodes( | ||
unicastZenPingExecutorService, | ||
logger, | ||
configuredHosts, | ||
limitPortCounts, | ||
transportService, | ||
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", | ||
resolveTimeout); | ||
} catch (InterruptedException e) { | ||
throw new RuntimeException(e); | ||
} | ||
final HashSet<DiscoveryNode> nodesToPing = buildNodesToPing(); | ||
|
||
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet()); | ||
try { | ||
receivedResponses.put(sendPingsHandler.id(), sendPingsHandler); | ||
try { | ||
sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes); | ||
sendPings(duration, null, sendPingsHandler, nodesToPing); | ||
} catch (RejectedExecutionException e) { | ||
logger.debug("Ping execution rejected", e); | ||
// The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings | ||
|
@@ -319,11 +309,12 @@ public void ping(final PingListener listener, final TimeValue duration) { | |
threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { | ||
@Override | ||
protected void doRun() { | ||
sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes); | ||
sendPings(duration, null, sendPingsHandler, nodesToPing); | ||
threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { | ||
@Override | ||
protected void doRun() throws Exception { | ||
sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler, resolvedDiscoveryNodes); | ||
sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), | ||
sendPingsHandler, nodesToPing); | ||
sendPingsHandler.close(); | ||
listener.onPing(sendPingsHandler.pingCollection().toList()); | ||
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { | ||
|
@@ -392,35 +383,24 @@ void sendPings( | |
final TimeValue timeout, | ||
@Nullable TimeValue waitTime, | ||
final SendPingsHandler sendPingsHandler, | ||
final List<DiscoveryNode> resolvedDiscoveryNodes) { | ||
final HashSet<DiscoveryNode> initialNodesToPingSet) { | ||
final UnicastPingRequest pingRequest = new UnicastPingRequest(); | ||
pingRequest.id = sendPingsHandler.id(); | ||
pingRequest.timeout = timeout; | ||
DiscoveryNodes discoNodes = contextProvider.nodes(); | ||
|
||
pingRequest.pingResponse = createPingResponse(discoNodes); | ||
|
||
HashSet<DiscoveryNode> nodesToPingSet = new HashSet<>(); | ||
for (PingResponse temporalResponse : temporalResponses) { | ||
// Only send pings to nodes that have the same cluster name. | ||
if (clusterName.equals(temporalResponse.clusterName())) { | ||
nodesToPingSet.add(temporalResponse.node()); | ||
} | ||
} | ||
nodesToPingSet.addAll(hostsProvider.buildDynamicNodes()); | ||
HashSet<DiscoveryNode> nodesToPingSet = new HashSet<>(initialNodesToPingSet); | ||
|
||
// add all possible master nodes that were active in the last known cluster configuration | ||
for (ObjectCursor<DiscoveryNode> masterNode : discoNodes.getMasterNodes().values()) { | ||
nodesToPingSet.add(masterNode.value); | ||
} | ||
// Only send pings to nodes that have the same cluster name. | ||
Set<DiscoveryNode> sameNameNodes = temporalResponses.stream() | ||
.filter(temporalResponse -> clusterName.equals(temporalResponse.clusterName())) | ||
.map(PingResponse::node).collect(Collectors.toSet()); | ||
nodesToPingSet.addAll(sameNameNodes); | ||
|
||
// sort the nodes by likelihood of being an active master | ||
List<DiscoveryNode> sortedNodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet); | ||
|
||
// add the configured hosts first | ||
final List<DiscoveryNode> nodesToPing = new ArrayList<>(resolvedDiscoveryNodes.size() + sortedNodesToPing.size()); | ||
nodesToPing.addAll(resolvedDiscoveryNodes); | ||
nodesToPing.addAll(sortedNodesToPing); | ||
List<DiscoveryNode> nodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet); | ||
|
||
final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); | ||
for (final DiscoveryNode node : nodesToPing) { | ||
|
@@ -518,6 +498,33 @@ public void run() { | |
} | ||
} | ||
|
||
private HashSet<DiscoveryNode> buildNodesToPing() { | ||
final List<DiscoveryNode> resolvedDiscoveryNodes; | ||
try { | ||
resolvedDiscoveryNodes = resolveDiscoveryNodes( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can add the return value of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @bleskes , if we add |
||
unicastZenPingExecutorService, | ||
logger, | ||
configuredHosts, | ||
limitPortCounts, | ||
transportService, | ||
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", | ||
resolveTimeout); | ||
} catch (InterruptedException e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
HashSet<DiscoveryNode> nodesToPingSet = new HashSet<>(resolvedDiscoveryNodes); | ||
|
||
nodesToPingSet.addAll(hostsProvider.buildDynamicNodes()); | ||
|
||
// add all possible master nodes that were active in the last known cluster configuration | ||
for (ObjectCursor<DiscoveryNode> masterNode : contextProvider.nodes().getMasterNodes().values()) { | ||
nodesToPingSet.add(masterNode.value); | ||
} | ||
|
||
return nodesToPingSet; | ||
} | ||
|
||
private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, | ||
final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) { | ||
logger.trace("[{}] sending to {}", id, nodeToSend); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this part has to stay here - we want to extend our pinging as we learn of new nodes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, I mv back this. Thanks