Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid repeat connections in pings every round #21812

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -292,24 +293,13 @@ Collection<PingResponse> pingAndWait(TimeValue duration) {
*/
@Override
public void ping(final PingListener listener, final TimeValue duration) {
final List<DiscoveryNode> resolvedDiscoveryNodes;
try {
resolvedDiscoveryNodes = resolveDiscoveryNodes(
unicastZenPingExecutorService,
logger,
configuredHosts,
limitPortCounts,
transportService,
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#",
resolveTimeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
final Tuple<List<DiscoveryNode>, HashSet<DiscoveryNode>> nodesToPing = buildNodesToPing();

final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet());
try {
receivedResponses.put(sendPingsHandler.id(), sendPingsHandler);
try {
sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes);
sendPings(duration, null, sendPingsHandler, nodesToPing);
} catch (RejectedExecutionException e) {
logger.debug("Ping execution rejected", e);
// The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings
Expand All @@ -319,11 +309,12 @@ public void ping(final PingListener listener, final TimeValue duration) {
threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() {
sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes);
sendPings(duration, null, sendPingsHandler, nodesToPing);
threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler, resolvedDiscoveryNodes);
sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2),
sendPingsHandler, nodesToPing);
sendPingsHandler.close();
listener.onPing(sendPingsHandler.pingCollection().toList());
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
Expand Down Expand Up @@ -392,34 +383,28 @@ void sendPings(
final TimeValue timeout,
@Nullable TimeValue waitTime,
final SendPingsHandler sendPingsHandler,
final List<DiscoveryNode> resolvedDiscoveryNodes) {
final Tuple<List<DiscoveryNode>, HashSet<DiscoveryNode>> initialNodesToPingSet) {
final UnicastPingRequest pingRequest = new UnicastPingRequest();
pingRequest.id = sendPingsHandler.id();
pingRequest.timeout = timeout;
DiscoveryNodes discoNodes = contextProvider.nodes();

pingRequest.pingResponse = createPingResponse(discoNodes);

HashSet<DiscoveryNode> nodesToPingSet = new HashSet<>();
for (PingResponse temporalResponse : temporalResponses) {
// Only send pings to nodes that have the same cluster name.
if (clusterName.equals(temporalResponse.clusterName())) {
nodesToPingSet.add(temporalResponse.node());
}
}
nodesToPingSet.addAll(hostsProvider.buildDynamicNodes());
HashSet<DiscoveryNode> nodesToPingSet = new HashSet<>(initialNodesToPingSet.v2());

// add all possible master nodes that were active in the last known cluster configuration
for (ObjectCursor<DiscoveryNode> masterNode : discoNodes.getMasterNodes().values()) {
nodesToPingSet.add(masterNode.value);
}
// Only send pings to nodes that have the same cluster name.
Set<DiscoveryNode> sameNameNodes = temporalResponses.stream()
.filter(temporalResponse -> clusterName.equals(temporalResponse.clusterName()))
.map(PingResponse::node).collect(Collectors.toSet());
nodesToPingSet.addAll(sameNameNodes);

// sort the nodes by likelihood of being an active master
List<DiscoveryNode> sortedNodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part has to stay here - we want to extend our pinging as we learn of new nodes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I mv back this. Thanks

// add the configured hosts first
final List<DiscoveryNode> nodesToPing = new ArrayList<>(resolvedDiscoveryNodes.size() + sortedNodesToPing.size());
nodesToPing.addAll(resolvedDiscoveryNodes);
final List<DiscoveryNode> nodesToPing = new ArrayList<>(initialNodesToPingSet.v1().size() + sortedNodesToPing.size());
nodesToPing.addAll(initialNodesToPingSet.v1());
nodesToPing.addAll(sortedNodesToPing);

final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
Expand Down Expand Up @@ -518,6 +503,33 @@ public void run() {
}
}

private Tuple<List<DiscoveryNode>, HashSet<DiscoveryNode>> buildNodesToPing() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we typically don't like using tuple return values. In this case I think we can just return a HashSet.

HashSet<DiscoveryNode> nodesToPingSet = new HashSet<>();

nodesToPingSet.addAll(hostsProvider.buildDynamicNodes());

// add all possible master nodes that were active in the last known cluster configuration
for (ObjectCursor<DiscoveryNode> masterNode : contextProvider.nodes().getMasterNodes().values()) {
nodesToPingSet.add(masterNode.value);
}

final List<DiscoveryNode> resolvedDiscoveryNodes;
try {
resolvedDiscoveryNodes = resolveDiscoveryNodes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add the return value of resolveDiscoveryNodes to the HashSet being built

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bleskes , if we add resolveDiscoveryNodes, we can't guarantee configured hosts first ping, Does it make sense?

unicastZenPingExecutorService,
logger,
configuredHosts,
limitPortCounts,
transportService,
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#",
resolveTimeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

return Tuple.tuple(resolvedDiscoveryNodes, nodesToPingSet);
}

private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest,
final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) {
logger.trace("[{}] sending to {}", id, nodeToSend);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

Expand Down Expand Up @@ -252,6 +254,151 @@ public ClusterState clusterState() {
assertCounters(handleD, handleA, handleB, handleC, handleD);
}

public void testHostProviderBuildDynamicNodesShouldBeCalledOnlyOnceInEachRound() throws IOException, InterruptedException {
final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();

NetworkService networkService = new NetworkService(settings, Collections.emptyList());

final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
s,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
networkService,
v);

NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier);
closeables.push(handleA.transportService);

final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build();

Settings hostsSettings = Settings.builder()
.putArray("discovery.zen.ping.unicast.hosts",
NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())))
.put("cluster.name", "test")
.build();

UnicastHostsProvider unicastHostsProviderA = mock(UnicastHostsProvider.class);
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, unicastHostsProviderA);
zenPingA.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A").build();
}

@Override
public ClusterState clusterState() {
return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build();
}
});
closeables.push(zenPingA);

zenPingA.pingAndWait(TimeValue.timeValueSeconds(1));
verify(unicastHostsProviderA, times(1)).buildDynamicNodes();
}

public void testShouldPingDiscoveredNodes() throws IOException, InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call this testDiscoveryOfNoneConfiguredNodes and add a comment: test that nodes discover each other if they ping a common host ?

This doesn't test what I intended - i.e., test that a node pings nodes that have pinged it while it was pinging. That is however non trivial to test and I don't want to delay your change for setting this test up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks your feedback, I will take time to see how to this test, if I have any progress on this, I will create a new PR for this. :)

final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();

NetworkService networkService = new NetworkService(settings, Collections.emptyList());

final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
s,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
networkService,
v);

NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier);
closeables.push(handleA.transportService);
NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier);
closeables.push(handleB.transportService);
NetworkHandle handleC = startServices(settings, threadPool, "UZP_C", Version.CURRENT, supplier);
closeables.push(handleC.transportService);

final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build();

Settings hostsSettingsA = Settings.builder()
.putArray("discovery.zen.ping.unicast.hosts",
NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())),
NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())))
.put("cluster.name", "test")
.build();

UnicastZenPing zenPingA = new UnicastZenPing(hostsSettingsA, threadPool, handleA.transportService, EMPTY_HOSTS_PROVIDER);
zenPingA.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A").build();
}

@Override
public ClusterState clusterState() {
return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build();
}
});
closeables.push(zenPingA);

Settings hostsSettingsB = Settings.builder()
.putArray("discovery.zen.ping.unicast.hosts",
NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())),
NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())),
NetworkAddress.format(new InetSocketAddress(handleC.address.address().getAddress(), handleC.address.address().getPort())))
.put("cluster.name", "test")
.build();
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettingsB, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER);
zenPingB.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build();
}

@Override
public ClusterState clusterState() {
return state;
}
});
closeables.push(zenPingB);

Settings hostsSettingsC = Settings.builder()
.putArray("discovery.zen.ping.unicast.hosts",
NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())),
NetworkAddress.format(new InetSocketAddress(handleC.address.address().getAddress(), handleC.address.address().getPort())))
.put("cluster.name", "test")
.build();
UnicastZenPing zenPingC = new UnicastZenPing(hostsSettingsC, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER);
zenPingC.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C").build();
}

@Override
public ClusterState clusterState() {
return state;
}
});
closeables.push(zenPingC);


logger.info("ping from UZP_C");
Collection<ZenPing.PingResponse> pingResponsesC = zenPingC.pingAndWait(TimeValue.timeValueSeconds(1));
assertThat(pingResponsesC.size(), equalTo(1));
ZenPing.PingResponse pingC = pingResponsesC.iterator().next();
assertThat(pingC.node().getId(), equalTo("UZP_B"));

logger.info("ping from UZP_A");
Collection<ZenPing.PingResponse> pingResponsesA = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1));
assertThat(pingResponsesA.size(), equalTo(2));
List<String> responseIds =
pingResponsesA.stream().map(response -> response.node().getId()).collect(Collectors.toList());
assertThat(responseIds, hasItem("UZP_B"));
assertThat(responseIds, hasItem("UZP_C"));
}

public void testUnknownHostNotCached() {
// use ephemeral ports
final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();
Expand Down