Skip to content

Commit

Permalink
TransportService should capture listener before spawning background n…
Browse files Browse the repository at this point in the history
…otification task

Not doing this made it difficult to establish a happens before relationship between connecting to a node and adding a listeners. Causing test code like this to fail sproadically:

```
        // connection to reuse
        handleA.transportService.connectToNode(handleB.node);

        // install a listener to check that no new connections are made
        handleA.transportService.addConnectionListener(new TransportConnectionListener() {
            @OverRide
            public void onConnectionOpened(DiscoveryNode node) {
                fail("should not open any connections. got [" + node + "]");
            }
        });

```

relates to #22277
  • Loading branch information
bleskes committed Dec 23, 2016
1 parent 3bbdb11 commit 94c61b5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static java.util.Collections.emptyList;
import static org.elasticsearch.common.settings.Setting.listSetting;
Expand Down Expand Up @@ -816,20 +817,20 @@ protected void checkForTimeout(long requestId) {

@Override
public void onNodeConnected(final DiscoveryNode node) {
threadPool.generic().execute(() -> {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeConnected(node);
}
});
// capture listeners before spawning the background callback so the following pattern won't trigger a call
// connectToNode(); connection is completed successfully
// addConnectionListener(); this listener shouldn't be called
final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream();
threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onNodeConnected(node)));
}

@Override
public void onConnectionOpened(DiscoveryNode node) {
threadPool.generic().execute(() -> {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onConnectionOpened(node);
}
});
// capture listeners before spawning the background callback so the following pattern won't trigger a call
// connectToNode(); connection is completed successfully
// addConnectionListener(); this listener shouldn't be called
final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream();
threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(node)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -542,7 +541,6 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi
}
}

@TestLogging("org.elasticsearch:DEBUG,org.elasticsearch.discovery:TRACE,org.elasticsearch.transport:TRACE")
public void testResolveReuseExistingNodeConnections() throws ExecutionException, InterruptedException {
final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();

Expand Down

0 comments on commit 94c61b5

Please sign in to comment.