diff --git a/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java b/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java index 97f4c3349c03c..2bd35b8ae62e5 100644 --- a/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java +++ b/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.transport; +import org.elasticsearch.common.unit.TimeValue; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -42,14 +44,16 @@ public final class ConnectionProfile { TransportRequestOptions.Type.PING, TransportRequestOptions.Type.RECOVERY, TransportRequestOptions.Type.REG, - TransportRequestOptions.Type.STATE)), 1); + TransportRequestOptions.Type.STATE)), 1, null); private final List handles; private final int numConnections; + private final TimeValue connectTimeout; - private ConnectionProfile(List handles, int numConnections) { + private ConnectionProfile(List handles, int numConnections, TimeValue timeout) { this.handles = handles; this.numConnections = numConnections; + connectTimeout = timeout; } /** @@ -59,6 +63,17 @@ public static class Builder { private final List handles = new ArrayList<>(); private final Set addedTypes = EnumSet.noneOf(TransportRequestOptions.Type.class); private int offset = 0; + private TimeValue connectTimeout; + + /** + * Sets a connect timeout for this connection profile + */ + public void setConnectTimeout(TimeValue timeout) { + if (timeout.millis() < 0) { + throw new IllegalArgumentException("timeout must be positive but was: " + timeout); + } + this.connectTimeout = timeout; + } /** * Adds a number of connections for one or more types. Each type can only be added once. @@ -89,8 +104,16 @@ public ConnectionProfile build() { if (types.isEmpty() == false) { throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types); } - return new ConnectionProfile(Collections.unmodifiableList(handles), offset); + return new ConnectionProfile(Collections.unmodifiableList(handles), offset, connectTimeout); } + + } + + /** + * Returns the connect timeout or null if no explicit timeout is set on this profile. + */ + public TimeValue getConnectTimeout() { + return connectTimeout; } /** diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 11e8de3c3cc2f..5cba86b6067e4 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -154,7 +154,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i protected final int connectionsPerNodeReg; protected final int connectionsPerNodeState; protected final int connectionsPerNodePing; - protected final TimeValue connectTimeout; protected final boolean blockingClient; private final CircuitBreakerService circuitBreakerService; // package visibility for tests @@ -200,9 +199,9 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo this.connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings); this.connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings); this.connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings); - this.connectTimeout = TCP_CONNECT_TIMEOUT.get(settings); this.blockingClient = TCP_BLOCKING_CLIENT.get(settings); ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings)); builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK); builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING); builder.addConnections(connectionsPerNodeRecovery, TransportRequestOptions.Type.RECOVERY); @@ -1373,4 +1372,8 @@ public void onFailure(Exception e) { } } } + + protected final TimeValue getDefaultConnectTimeout() { + return defaultConnectionProfile.getConnectTimeout(); + } } diff --git a/core/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java b/core/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java index 04973e70cb37d..b09fb9c7e5a40 100644 --- a/core/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java +++ b/core/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; @@ -25,6 +26,11 @@ public class ConnectionProfileTests extends ESTestCase { public void testBuildConnectionProfile() { ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + TimeValue connectTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10)); + final boolean setConnectTimeout = randomBoolean(); + if (setConnectTimeout) { + builder.setConnectTimeout(connectTimeout); + } builder.addConnections(1, TransportRequestOptions.Type.BULK); builder.addConnections(2, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY); builder.addConnections(3, TransportRequestOptions.Type.PING); @@ -37,6 +43,11 @@ public void testBuildConnectionProfile() { builder.addConnections(4, TransportRequestOptions.Type.REG); ConnectionProfile build = builder.build(); assertEquals(10, build.getNumConnections()); + if (setConnectTimeout) { + assertEquals(connectTimeout, build.getConnectTimeout()); + } else { + assertNull(build.getConnectTimeout()); + } Integer[] array = new Integer[10]; for (int i = 0; i < array.length; i++) { array[i] = i; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index f70d9ab6d0fb1..0dd526eff3dfb 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -55,6 +55,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; @@ -203,7 +204,7 @@ private Bootstrap createBootstrap() { bootstrap.handler(getClientChannelInitializer()); - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.millis())); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(getDefaultConnectTimeout().millis())); bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); bootstrap.option(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings)); @@ -269,8 +270,8 @@ private void createServerBootstrap(String name, Settings settings) { logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], " + "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]", name, workerCount, settings.get("port"), settings.get("bind_host"), settings.get("publish_host"), compress, - connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, - connectionsPerNodePing, receivePredictorMin, receivePredictorMax); + getDefaultConnectTimeout(), connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, + connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax); } final ThreadFactory workerFactory = daemonThreadFactory(this.settings, TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, name); @@ -338,7 +339,17 @@ protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile p final NodeChannels nodeChannels = new NodeChannels(channels, profile); boolean success = false; try { - int numConnections = channels.length; + final int numConnections = channels.length; + final TimeValue connectTimeout; + final Bootstrap bootstrap; + if (profile.getConnectTimeout() != null && profile.getConnectTimeout().equals(getDefaultConnectTimeout()) == false) { + bootstrap = this.bootstrap.clone(this.bootstrap.config().group()); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(profile.getConnectTimeout().millis())); + connectTimeout = profile.getConnectTimeout(); + } else { + connectTimeout = getDefaultConnectTimeout(); + bootstrap = this.bootstrap; + } final ArrayList connections = new ArrayList<>(numConnections); final InetSocketAddress address = node.getAddress().address(); for (int i = 0; i < numConnections; i++) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index cc32fb3f6ac50..9488f039f75fc 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -44,7 +44,12 @@ import org.junit.Before; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.sql.Time; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -1721,4 +1726,45 @@ public void testRegisterHandlerTwice() { serviceA.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), (request, message) -> {throw new AssertionError("boom");}); } + + public void testTimeoutPerConnection() throws IOException { + try (ServerSocket socket = new ServerSocket()) { + // nocommit - this test uses backlog=1 which is implementation specific ie. it might not work on some TCP/IP stacks + // on linux (at least newer ones) the listen(addr, backlog=1) should just ignore new connections if the queue is full which + // means that oncewe received an ACK from the client we just drop the packet on the floor (which is what we want) and we run + // into a connection timeout quickly. Yet other implementations can for instance can terminate the connection within the 3 way + // handshake which I haven't tested yet. + socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0), 1); + socket.setReuseAddress(true); + DiscoveryNode first = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(), + socket.getLocalPort()), emptyMap(), + emptySet(), version0); + DiscoveryNode second = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(), + socket.getLocalPort()), emptyMap(), + emptySet(), version0); + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(1, + TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.PING, + TransportRequestOptions.Type.RECOVERY, + TransportRequestOptions.Type.REG, + TransportRequestOptions.Type.STATE); + + // connection with one connection and a large timeout -- should consume the one spot in the queu + serviceA.connectToNode(first, builder.build()); + builder.setConnectTimeout(TimeValue.timeValueMillis(1)); + final ConnectionProfile profile = builder.build(); + // now with the 1ms timeout we got and test that is it's applied + long startTime = System.nanoTime(); + ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> { + serviceA.connectToNode(second, profile); + }); + final long now = System.nanoTime(); + final long timeTaken = TimeValue.nsecToMSec(now - startTime); + assertTrue("test didn't timeout quick enough, time taken: [" + timeTaken + "]", + timeTaken < TimeValue.timeValueSeconds(5).millis()); + logger.warn("", ex); + assertEquals(ex.getMessage(), "[][" + second.getAddress() + "] connect_timeout[1ms]"); + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index fc33ce3c6350e..3997a244b4cb4 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -46,6 +47,7 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; +import java.net.SocketTimeoutException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -178,7 +180,12 @@ protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile p final InetSocketAddress address = node.getAddress().address(); // we just use a single connections configureSocket(socket); - socket.connect(address, (int) TCP_CONNECT_TIMEOUT.get(settings).millis()); + TimeValue connectTimeout = profile.getConnectTimeout() == null ? getDefaultConnectTimeout() : profile.getConnectTimeout(); + try { + socket.connect(address, Math.toIntExact(connectTimeout.millis())); + } catch (SocketTimeoutException ex) { + throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", ex); + } MockChannel channel = new MockChannel(socket, address, "none", onClose); channel.loopRead(executor); mockChannels[0] = channel;