From 94c61b5b0a4de77dcc60ce80dc1b03d817e81013 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 23 Dec 2016 13:40:54 +0100 Subject: [PATCH] TransportService should capture listener before spawning background notification task Not doing this made it difficult to establish a happens before relationship between connecting to a node and adding a listeners. Causing test code like this to fail sproadically: ``` // connection to reuse handleA.transportService.connectToNode(handleB.node); // install a listener to check that no new connections are made handleA.transportService.addConnectionListener(new TransportConnectionListener() { @Override public void onConnectionOpened(DiscoveryNode node) { fail("should not open any connections. got [" + node + "]"); } }); ``` relates to #22277 --- .../transport/TransportService.java | 21 ++++++++++--------- .../discovery/zen/UnicastZenPingTests.java | 2 -- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 6d2c331153b15..6c3842541af61 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -64,6 +64,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Stream; import static java.util.Collections.emptyList; import static org.elasticsearch.common.settings.Setting.listSetting; @@ -816,20 +817,20 @@ protected void checkForTimeout(long requestId) { @Override public void onNodeConnected(final DiscoveryNode node) { - threadPool.generic().execute(() -> { - for (TransportConnectionListener connectionListener : connectionListeners) { - connectionListener.onNodeConnected(node); - } - }); + // capture listeners before spawning the background callback so the following pattern won't trigger a call + // connectToNode(); connection is completed successfully + // addConnectionListener(); this listener shouldn't be called + final Stream listenersToNotify = TransportService.this.connectionListeners.stream(); + threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onNodeConnected(node))); } @Override public void onConnectionOpened(DiscoveryNode node) { - threadPool.generic().execute(() -> { - for (TransportConnectionListener connectionListener : connectionListeners) { - connectionListener.onConnectionOpened(node); - } - }); + // capture listeners before spawning the background callback so the following pattern won't trigger a call + // connectToNode(); connection is completed successfully + // addConnectionListener(); this listener shouldn't be called + final Stream listenersToNotify = TransportService.this.connectionListeners.stream(); + threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(node))); } @Override diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 4e056c3556fea..f2923f8c792f6 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -42,7 +42,6 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -542,7 +541,6 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi } } - @TestLogging("org.elasticsearch:DEBUG,org.elasticsearch.discovery:TRACE,org.elasticsearch.transport:TRACE") public void testResolveReuseExistingNodeConnections() throws ExecutionException, InterruptedException { final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();