From 7a3deeb969f59fa1d29bd63eaea411687e55061b Mon Sep 17 00:00:00 2001 From: Xiang Chen Date: Sun, 27 Nov 2016 14:58:34 +0800 Subject: [PATCH 1/3] avoid repeat connections in pings every round --- .../discovery/zen/UnicastZenPing.java | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index eec9548dd08ac..4e19bb2aefa3e 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -292,6 +292,7 @@ Collection pingAndWait(TimeValue duration) { */ @Override public void ping(final PingListener listener, final TimeValue duration) { + final List sortedNodesToPing = buildNodesToPing(); final List resolvedDiscoveryNodes; try { resolvedDiscoveryNodes = resolveDiscoveryNodes( @@ -309,7 +310,7 @@ public void ping(final PingListener listener, final TimeValue duration) { try { receivedResponses.put(sendPingsHandler.id(), sendPingsHandler); try { - sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes); + sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes, sortedNodesToPing); } catch (RejectedExecutionException e) { logger.debug("Ping execution rejected", e); // The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings @@ -319,11 +320,12 @@ public void ping(final PingListener listener, final TimeValue duration) { threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override protected void doRun() { - sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes); + sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes, sortedNodesToPing); threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override protected void doRun() throws Exception { - sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler, resolvedDiscoveryNodes); + sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), + sendPingsHandler, resolvedDiscoveryNodes, sortedNodesToPing); sendPingsHandler.close(); listener.onPing(sendPingsHandler.pingCollection().toList()); for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { @@ -392,7 +394,7 @@ void sendPings( final TimeValue timeout, @Nullable TimeValue waitTime, final SendPingsHandler sendPingsHandler, - final List resolvedDiscoveryNodes) { + final List resolvedDiscoveryNodes, final List sortedNodesToPing) { final UnicastPingRequest pingRequest = new UnicastPingRequest(); pingRequest.id = sendPingsHandler.id(); pingRequest.timeout = timeout; @@ -400,23 +402,6 @@ void sendPings( pingRequest.pingResponse = createPingResponse(discoNodes); - HashSet nodesToPingSet = new HashSet<>(); - for (PingResponse temporalResponse : temporalResponses) { - // Only send pings to nodes that have the same cluster name. - if (clusterName.equals(temporalResponse.clusterName())) { - nodesToPingSet.add(temporalResponse.node()); - } - } - nodesToPingSet.addAll(hostsProvider.buildDynamicNodes()); - - // add all possible master nodes that were active in the last known cluster configuration - for (ObjectCursor masterNode : discoNodes.getMasterNodes().values()) { - nodesToPingSet.add(masterNode.value); - } - - // sort the nodes by likelihood of being an active master - List sortedNodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet); - // add the configured hosts first final List nodesToPing = new ArrayList<>(resolvedDiscoveryNodes.size() + sortedNodesToPing.size()); nodesToPing.addAll(resolvedDiscoveryNodes); @@ -518,6 +503,25 @@ public void run() { } } + private List buildNodesToPing() { + HashSet nodesToPingSet = new HashSet<>(); + for (PingResponse temporalResponse : temporalResponses) { + // Only send pings to nodes that have the same cluster name. + if (clusterName.equals(temporalResponse.clusterName())) { + nodesToPingSet.add(temporalResponse.node()); + } + } + nodesToPingSet.addAll(hostsProvider.buildDynamicNodes()); + + // add all possible master nodes that were active in the last known cluster configuration + for (ObjectCursor masterNode : contextProvider.nodes().getMasterNodes().values()) { + nodesToPingSet.add(masterNode.value); + } + + // sort the nodes by likelihood of being an active master + return ElectMasterService.sortByMasterLikelihood(nodesToPingSet); + } + private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) { logger.trace("[{}] sending to {}", id, nodeToSend); From 3114c21fd7cae764a75f17b6c07f42f3374c0ae9 Mon Sep 17 00:00:00 2001 From: Xiang Chen Date: Mon, 28 Nov 2016 01:03:42 +0800 Subject: [PATCH 2/3] mv temporalResponses to sendPings method for finding new nodes in each sendPings, and add unihostprovider test, pick up discovered node test --- .../discovery/zen/UnicastZenPing.java | 66 ++++---- .../discovery/zen/UnicastZenPingTests.java | 147 ++++++++++++++++++ 2 files changed, 184 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 4e19bb2aefa3e..1abcd52933346 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -292,25 +293,13 @@ Collection pingAndWait(TimeValue duration) { */ @Override public void ping(final PingListener listener, final TimeValue duration) { - final List sortedNodesToPing = buildNodesToPing(); - final List resolvedDiscoveryNodes; - try { - resolvedDiscoveryNodes = resolveDiscoveryNodes( - unicastZenPingExecutorService, - logger, - configuredHosts, - limitPortCounts, - transportService, - () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", - resolveTimeout); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + final Tuple, HashSet> nodesToPing = buildNodesToPing(); + final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet()); try { receivedResponses.put(sendPingsHandler.id(), sendPingsHandler); try { - sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes, sortedNodesToPing); + sendPings(duration, null, sendPingsHandler, nodesToPing); } catch (RejectedExecutionException e) { logger.debug("Ping execution rejected", e); // The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings @@ -320,12 +309,12 @@ public void ping(final PingListener listener, final TimeValue duration) { threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override protected void doRun() { - sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes, sortedNodesToPing); + sendPings(duration, null, sendPingsHandler, nodesToPing); threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override protected void doRun() throws Exception { sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), - sendPingsHandler, resolvedDiscoveryNodes, sortedNodesToPing); + sendPingsHandler, nodesToPing); sendPingsHandler.close(); listener.onPing(sendPingsHandler.pingCollection().toList()); for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { @@ -394,7 +383,7 @@ void sendPings( final TimeValue timeout, @Nullable TimeValue waitTime, final SendPingsHandler sendPingsHandler, - final List resolvedDiscoveryNodes, final List sortedNodesToPing) { + final Tuple, HashSet> initialNodesToPingSet) { final UnicastPingRequest pingRequest = new UnicastPingRequest(); pingRequest.id = sendPingsHandler.id(); pingRequest.timeout = timeout; @@ -402,9 +391,20 @@ void sendPings( pingRequest.pingResponse = createPingResponse(discoNodes); + HashSet nodesToPingSet = new HashSet<>(initialNodesToPingSet.v2()); + + // Only send pings to nodes that have the same cluster name. + Set sameNameNodes = temporalResponses.stream() + .filter(temporalResponse -> clusterName.equals(temporalResponse.clusterName())) + .map(PingResponse::node).collect(Collectors.toSet()); + nodesToPingSet.addAll(sameNameNodes); + + // sort the nodes by likelihood of being an active master + List sortedNodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet); + // add the configured hosts first - final List nodesToPing = new ArrayList<>(resolvedDiscoveryNodes.size() + sortedNodesToPing.size()); - nodesToPing.addAll(resolvedDiscoveryNodes); + final List nodesToPing = new ArrayList<>(initialNodesToPingSet.v1().size() + sortedNodesToPing.size()); + nodesToPing.addAll(initialNodesToPingSet.v1()); nodesToPing.addAll(sortedNodesToPing); final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); @@ -503,14 +503,9 @@ public void run() { } } - private List buildNodesToPing() { + private Tuple, HashSet> buildNodesToPing() { HashSet nodesToPingSet = new HashSet<>(); - for (PingResponse temporalResponse : temporalResponses) { - // Only send pings to nodes that have the same cluster name. - if (clusterName.equals(temporalResponse.clusterName())) { - nodesToPingSet.add(temporalResponse.node()); - } - } + nodesToPingSet.addAll(hostsProvider.buildDynamicNodes()); // add all possible master nodes that were active in the last known cluster configuration @@ -518,8 +513,21 @@ private List buildNodesToPing() { nodesToPingSet.add(masterNode.value); } - // sort the nodes by likelihood of being an active master - return ElectMasterService.sortByMasterLikelihood(nodesToPingSet); + final List resolvedDiscoveryNodes; + try { + resolvedDiscoveryNodes = resolveDiscoveryNodes( + unicastZenPingExecutorService, + logger, + configuredHosts, + limitPortCounts, + transportService, + () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", + resolveTimeout); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return Tuple.tuple(resolvedDiscoveryNodes, nodesToPingSet); } private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 5a91426d6bcdf..c361ff4d08f4e 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -81,9 +81,11 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -252,6 +254,151 @@ public ClusterState clusterState() { assertCounters(handleD, handleA, handleB, handleC, handleD); } + public void testHostProviderBuildDynamicNodesShouldBeCalledOnlyOnceInEachRound() throws IOException, InterruptedException { + final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); + + NetworkService networkService = new NetworkService(settings, Collections.emptyList()); + + final BiFunction supplier = (s, v) -> new MockTcpTransport( + s, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + v); + + NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier); + closeables.push(handleA.transportService); + + final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build(); + + Settings hostsSettings = Settings.builder() + .putArray("discovery.zen.ping.unicast.hosts", + NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort()))) + .put("cluster.name", "test") + .build(); + + UnicastHostsProvider unicastHostsProviderA = mock(UnicastHostsProvider.class); + UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, unicastHostsProviderA); + zenPingA.start(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A").build(); + } + + @Override + public ClusterState clusterState() { + return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); + } + }); + closeables.push(zenPingA); + + zenPingA.pingAndWait(TimeValue.timeValueSeconds(1)); + verify(unicastHostsProviderA, times(1)).buildDynamicNodes(); + } + + public void testShouldPingDiscoveredNodes() throws IOException, InterruptedException { + final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); + + NetworkService networkService = new NetworkService(settings, Collections.emptyList()); + + final BiFunction supplier = (s, v) -> new MockTcpTransport( + s, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + v); + + NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier); + closeables.push(handleA.transportService); + NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier); + closeables.push(handleB.transportService); + NetworkHandle handleC = startServices(settings, threadPool, "UZP_C", Version.CURRENT, supplier); + closeables.push(handleC.transportService); + + final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build(); + + Settings hostsSettingsA = Settings.builder() + .putArray("discovery.zen.ping.unicast.hosts", + NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())), + NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort()))) + .put("cluster.name", "test") + .build(); + + UnicastZenPing zenPingA = new UnicastZenPing(hostsSettingsA, threadPool, handleA.transportService, EMPTY_HOSTS_PROVIDER); + zenPingA.start(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A").build(); + } + + @Override + public ClusterState clusterState() { + return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); + } + }); + closeables.push(zenPingA); + + Settings hostsSettingsB = Settings.builder() + .putArray("discovery.zen.ping.unicast.hosts", + NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())), + NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())), + NetworkAddress.format(new InetSocketAddress(handleC.address.address().getAddress(), handleC.address.address().getPort()))) + .put("cluster.name", "test") + .build(); + UnicastZenPing zenPingB = new UnicastZenPing(hostsSettingsB, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER); + zenPingB.start(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build(); + } + + @Override + public ClusterState clusterState() { + return state; + } + }); + closeables.push(zenPingB); + + Settings hostsSettingsC = Settings.builder() + .putArray("discovery.zen.ping.unicast.hosts", + NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())), + NetworkAddress.format(new InetSocketAddress(handleC.address.address().getAddress(), handleC.address.address().getPort()))) + .put("cluster.name", "test") + .build(); + UnicastZenPing zenPingC = new UnicastZenPing(hostsSettingsC, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER); + zenPingC.start(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C").build(); + } + + @Override + public ClusterState clusterState() { + return state; + } + }); + closeables.push(zenPingC); + + + logger.info("ping from UZP_C"); + Collection pingResponsesC = zenPingC.pingAndWait(TimeValue.timeValueSeconds(1)); + assertThat(pingResponsesC.size(), equalTo(1)); + ZenPing.PingResponse pingC = pingResponsesC.iterator().next(); + assertThat(pingC.node().getId(), equalTo("UZP_B")); + + logger.info("ping from UZP_A"); + Collection pingResponsesA = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1)); + assertThat(pingResponsesA.size(), equalTo(2)); + List responseIds = + pingResponsesA.stream().map(response -> response.node().getId()).collect(Collectors.toList()); + assertThat(responseIds, hasItem("UZP_B")); + assertThat(responseIds, hasItem("UZP_C")); + } + public void testUnknownHostNotCached() { // use ephemeral ports final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); From 62b7a7e60b34e8ef784ea8b09d7586154081bd29 Mon Sep 17 00:00:00 2001 From: Xiang Chen Date: Mon, 28 Nov 2016 18:14:07 +0800 Subject: [PATCH 3/3] update Tuple type to HashSet type, and rename testShouldPingDiscoveredNodes to testDiscoveryOfNoneConfiguredNodes --- .../discovery/zen/UnicastZenPing.java | 35 ++++++++----------- .../discovery/zen/UnicastZenPingTests.java | 3 +- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 1abcd52933346..52aa9afd48113 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -293,7 +293,7 @@ Collection pingAndWait(TimeValue duration) { */ @Override public void ping(final PingListener listener, final TimeValue duration) { - final Tuple, HashSet> nodesToPing = buildNodesToPing(); + final HashSet nodesToPing = buildNodesToPing(); final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet()); try { @@ -383,7 +383,7 @@ void sendPings( final TimeValue timeout, @Nullable TimeValue waitTime, final SendPingsHandler sendPingsHandler, - final Tuple, HashSet> initialNodesToPingSet) { + final HashSet initialNodesToPingSet) { final UnicastPingRequest pingRequest = new UnicastPingRequest(); pingRequest.id = sendPingsHandler.id(); pingRequest.timeout = timeout; @@ -391,7 +391,7 @@ void sendPings( pingRequest.pingResponse = createPingResponse(discoNodes); - HashSet nodesToPingSet = new HashSet<>(initialNodesToPingSet.v2()); + HashSet nodesToPingSet = new HashSet<>(initialNodesToPingSet); // Only send pings to nodes that have the same cluster name. Set sameNameNodes = temporalResponses.stream() @@ -400,12 +400,7 @@ void sendPings( nodesToPingSet.addAll(sameNameNodes); // sort the nodes by likelihood of being an active master - List sortedNodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet); - - // add the configured hosts first - final List nodesToPing = new ArrayList<>(initialNodesToPingSet.v1().size() + sortedNodesToPing.size()); - nodesToPing.addAll(initialNodesToPingSet.v1()); - nodesToPing.addAll(sortedNodesToPing); + List nodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet); final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); for (final DiscoveryNode node : nodesToPing) { @@ -503,16 +498,7 @@ public void run() { } } - private Tuple, HashSet> buildNodesToPing() { - HashSet nodesToPingSet = new HashSet<>(); - - nodesToPingSet.addAll(hostsProvider.buildDynamicNodes()); - - // add all possible master nodes that were active in the last known cluster configuration - for (ObjectCursor masterNode : contextProvider.nodes().getMasterNodes().values()) { - nodesToPingSet.add(masterNode.value); - } - + private HashSet buildNodesToPing() { final List resolvedDiscoveryNodes; try { resolvedDiscoveryNodes = resolveDiscoveryNodes( @@ -527,7 +513,16 @@ private Tuple, HashSet> buildNodesToPing() { throw new RuntimeException(e); } - return Tuple.tuple(resolvedDiscoveryNodes, nodesToPingSet); + HashSet nodesToPingSet = new HashSet<>(resolvedDiscoveryNodes); + + nodesToPingSet.addAll(hostsProvider.buildDynamicNodes()); + + // add all possible master nodes that were active in the last known cluster configuration + for (ObjectCursor masterNode : contextProvider.nodes().getMasterNodes().values()) { + nodesToPingSet.add(masterNode.value); + } + + return nodesToPingSet; } private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index c361ff4d08f4e..11d680690cdf6 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -298,7 +298,8 @@ public ClusterState clusterState() { verify(unicastHostsProviderA, times(1)).buildDynamicNodes(); } - public void testShouldPingDiscoveredNodes() throws IOException, InterruptedException { + //test that nodes discover each other if they ping a common host + public void testDiscoveryOfNoneConfiguredNodes() throws IOException, InterruptedException { final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); NetworkService networkService = new NetworkService(settings, Collections.emptyList());