Skip to content

Commit

Permalink
Add a connect timeout to the ConnectionProfile to allow per node conn…
Browse files Browse the repository at this point in the history
…ect timeouts

Timeouts are global today across all connections this commit allows to specify
a connection timeout per node such that depending on the context connections can
be established with different timeouts.

Relates to elastic#19719
  • Loading branch information
s1monw committed Nov 29, 2016
1 parent f5ff69f commit 51a88d3
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.elasticsearch.transport;

import org.elasticsearch.common.unit.TimeValue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -42,14 +44,16 @@ public final class ConnectionProfile {
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE)), 1);
TransportRequestOptions.Type.STATE)), 1, null);

private final List<ConnectionTypeHandle> handles;
private final int numConnections;
private final TimeValue connectTimeout;

private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections) {
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue timeout) {
this.handles = handles;
this.numConnections = numConnections;
connectTimeout = timeout;
}

/**
Expand All @@ -59,6 +63,17 @@ public static class Builder {
private final List<ConnectionTypeHandle> handles = new ArrayList<>();
private final Set<TransportRequestOptions.Type> addedTypes = EnumSet.noneOf(TransportRequestOptions.Type.class);
private int offset = 0;
private TimeValue connectTimeout;

/**
* Sets a connect timeout for this connection profile
*/
public void setConnectTimeout(TimeValue timeout) {
if (timeout.millis() < 0) {
throw new IllegalArgumentException("timeout must be positive but was: " + timeout);
}
this.connectTimeout = timeout;
}

/**
* Adds a number of connections for one or more types. Each type can only be added once.
Expand Down Expand Up @@ -89,8 +104,16 @@ public ConnectionProfile build() {
if (types.isEmpty() == false) {
throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types);
}
return new ConnectionProfile(Collections.unmodifiableList(handles), offset);
return new ConnectionProfile(Collections.unmodifiableList(handles), offset, connectTimeout);
}

}

/**
* Returns the connect timeout or <code>null</code> if no explicit timeout is set on this profile.
*/
public TimeValue getConnectTimeout() {
return connectTimeout;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
protected final int connectionsPerNodeReg;
protected final int connectionsPerNodeState;
protected final int connectionsPerNodePing;
protected final TimeValue connectTimeout;
protected final boolean blockingClient;
private final CircuitBreakerService circuitBreakerService;
// package visibility for tests
Expand Down Expand Up @@ -200,9 +199,9 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
this.connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings);
this.connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);
this.connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);
this.connectTimeout = TCP_CONNECT_TIMEOUT.get(settings);
this.blockingClient = TCP_BLOCKING_CLIENT.get(settings);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
builder.addConnections(connectionsPerNodeRecovery, TransportRequestOptions.Type.RECOVERY);
Expand Down Expand Up @@ -1373,4 +1372,8 @@ public void onFailure(Exception e) {
}
}
}

protected final TimeValue getDefaultConnectTimeout() {
return defaultConnectionProfile.getConnectTimeout();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
*/
package org.elasticsearch.transport;

import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

public class ConnectionProfileTests extends ESTestCase {

public void testBuildConnectionProfile() {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
TimeValue connectTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10));
final boolean setConnectTimeout = randomBoolean();
if (setConnectTimeout) {
builder.setConnectTimeout(connectTimeout);
}
builder.addConnections(1, TransportRequestOptions.Type.BULK);
builder.addConnections(2, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY);
builder.addConnections(3, TransportRequestOptions.Type.PING);
Expand All @@ -37,6 +43,11 @@ public void testBuildConnectionProfile() {
builder.addConnections(4, TransportRequestOptions.Type.REG);
ConnectionProfile build = builder.build();
assertEquals(10, build.getNumConnections());
if (setConnectTimeout) {
assertEquals(connectTimeout, build.getConnectTimeout());
} else {
assertNull(build.getConnectTimeout());
}
Integer[] array = new Integer[10];
for (int i = 0; i < array.length; i++) {
array[i] = i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
Expand Down Expand Up @@ -203,7 +204,7 @@ private Bootstrap createBootstrap() {

bootstrap.handler(getClientChannelInitializer());

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.millis()));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(getDefaultConnectTimeout().millis()));
bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
bootstrap.option(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings));

Expand Down Expand Up @@ -269,8 +270,8 @@ private void createServerBootstrap(String name, Settings settings) {
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
+ "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
name, workerCount, settings.get("port"), settings.get("bind_host"), settings.get("publish_host"), compress,
connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState,
connectionsPerNodePing, receivePredictorMin, receivePredictorMax);
getDefaultConnectTimeout(), connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg,
connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax);
}

final ThreadFactory workerFactory = daemonThreadFactory(this.settings, TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, name);
Expand Down Expand Up @@ -338,7 +339,17 @@ protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile p
final NodeChannels nodeChannels = new NodeChannels(channels, profile);
boolean success = false;
try {
int numConnections = channels.length;
final int numConnections = channels.length;
final TimeValue connectTimeout;
final Bootstrap bootstrap;
if (profile.getConnectTimeout() != null && profile.getConnectTimeout().equals(getDefaultConnectTimeout()) == false) {
bootstrap = this.bootstrap.clone(this.bootstrap.config().group());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(profile.getConnectTimeout().millis()));
connectTimeout = profile.getConnectTimeout();
} else {
connectTimeout = getDefaultConnectTimeout();
bootstrap = this.bootstrap;
}
final ArrayList<ChannelFuture> connections = new ArrayList<>(numConnections);
final InetSocketAddress address = node.getAddress().address();
for (int i = 0; i < numConnections; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@
import org.junit.Before;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.sql.Time;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1721,4 +1726,45 @@ public void testRegisterHandlerTwice() {
serviceA.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
(request, message) -> {throw new AssertionError("boom");});
}

public void testTimeoutPerConnection() throws IOException {
try (ServerSocket socket = new ServerSocket()) {
// nocommit - this test uses backlog=1 which is implementation specific ie. it might not work on some TCP/IP stacks
// on linux (at least newer ones) the listen(addr, backlog=1) should just ignore new connections if the queue is full which
// means that oncewe received an ACK from the client we just drop the packet on the floor (which is what we want) and we run
// into a connection timeout quickly. Yet other implementations can for instance can terminate the connection within the 3 way
// handshake which I haven't tested yet.
socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0), 1);
socket.setReuseAddress(true);
DiscoveryNode first = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(),
socket.getLocalPort()), emptyMap(),
emptySet(), version0);
DiscoveryNode second = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(),
socket.getLocalPort()), emptyMap(),
emptySet(), version0);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1,
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE);

// connection with one connection and a large timeout -- should consume the one spot in the queu
serviceA.connectToNode(first, builder.build());
builder.setConnectTimeout(TimeValue.timeValueMillis(1));
final ConnectionProfile profile = builder.build();
// now with the 1ms timeout we got and test that is it's applied
long startTime = System.nanoTime();
ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> {
serviceA.connectToNode(second, profile);
});
final long now = System.nanoTime();
final long timeTaken = TimeValue.nsecToMSec(now - startTime);
assertTrue("test didn't timeout quick enough, time taken: [" + timeTaken + "]",
timeTaken < TimeValue.timeValueSeconds(5).millis());
logger.warn("", ex);
assertEquals(ex.getMessage(), "[][" + second.getAddress() + "] connect_timeout[1ms]");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand All @@ -46,6 +47,7 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -178,7 +180,12 @@ protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile p
final InetSocketAddress address = node.getAddress().address();
// we just use a single connections
configureSocket(socket);
socket.connect(address, (int) TCP_CONNECT_TIMEOUT.get(settings).millis());
TimeValue connectTimeout = profile.getConnectTimeout() == null ? getDefaultConnectTimeout() : profile.getConnectTimeout();
try {
socket.connect(address, Math.toIntExact(connectTimeout.millis()));
} catch (SocketTimeoutException ex) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", ex);
}
MockChannel channel = new MockChannel(socket, address, "none", onClose);
channel.loopRead(executor);
mockChannels[0] = channel;
Expand Down

0 comments on commit 51a88d3

Please sign in to comment.