From f1a821f0a9dd66d8b24a01b2cdf342ff6a2a12fc Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Tue, 1 Sep 2020 13:08:29 +0200
Subject: [PATCH 1/2] Fixes #5217 - Review RoundRobinConnectionPool
Introduced IndexedConnectionPool and RandomConnectionPool.
Clarified semantic of RoundRobinConnectionPool.
Signed-off-by: Simone Bordet
---
.../jetty/client/AbstractConnectionPool.java | 27 ++++++-
.../jetty/client/IndexedConnectionPool.java | 79 +++++++++++++++++++
.../jetty/client/RandomConnectionPool.java | 43 ++++++++++
.../client/RoundRobinConnectionPool.java | 60 ++++++--------
.../jetty/client/ConnectionPoolTest.java | 3 +-
.../client/RoundRobinConnectionPoolTest.java | 16 ++--
6 files changed, 184 insertions(+), 44 deletions(-)
create mode 100644 jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java
create mode 100644 jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
index 6ec1ee474254..4832590f7cb2 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
@@ -48,6 +48,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
private final HttpDestination destination;
private final Callback requester;
private final Pool pool;
+ private boolean maximizeConnections;
/**
* @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead
@@ -151,11 +152,28 @@ public boolean isEmpty()
}
@Override
+ @ManagedAttribute("Whether this pool is closed")
public boolean isClosed()
{
return pool.isClosed();
}
+ @ManagedAttribute("Whether the pool tries to maximize the number of connections used")
+ public boolean isMaximizeConnections()
+ {
+ return maximizeConnections;
+ }
+
+ /**
+ * Sets whether the number of connections should be maximized.
+ *
+ * @param maximizeConnections whether the number of connections should be maximized
+ */
+ public void setMaximizeConnections(boolean maximizeConnections)
+ {
+ this.maximizeConnections = maximizeConnections;
+ }
+
@Override
public Connection acquire()
{
@@ -164,7 +182,8 @@ public Connection acquire()
/**
* Returns an idle connection, if available;
- * if an idle connection is not available, and the given {@code create} parameter is {@code true},
+ * if an idle connection is not available, and the given {@code create} parameter is {@code true}
+ * or {@link #isMaximizeConnections()} is {@code true},
* then schedules the opening of a new connection, if possible within the configuration of this
* connection pool (for example, if it does not exceed the max connection count);
* otherwise returns {@code null}.
@@ -178,7 +197,7 @@ protected Connection acquire(boolean create)
if (LOG.isDebugEnabled())
LOG.debug("Acquiring create={} on {}", create, this);
Connection connection = activate();
- if (connection == null && create)
+ if (connection == null && (create || isMaximizeConnections()))
{
tryCreate(destination.getQueuedRequestCount());
connection = activate();
@@ -357,8 +376,8 @@ protected void removed(Connection connection)
}
/**
- * @deprecated Relying on this method indicates a reliance on the implementation details.
* @return an unmodifiable queue working as a view of the idle connections.
+ * @deprecated Relying on this method indicates a reliance on the implementation details.
*/
@Deprecated
public Queue getIdleConnections()
@@ -371,8 +390,8 @@ public Queue getIdleConnections()
}
/**
- * @deprecated Relying on this method indicates a reliance on the implementation details.
* @return an unmodifiable collection working as a view of the active connections.
+ * @deprecated Relying on this method indicates a reliance on the implementation details.
*/
@Deprecated
public Collection getActiveConnections()
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java
new file mode 100644
index 000000000000..caf71b2621cf
--- /dev/null
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java
@@ -0,0 +1,79 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.client;
+
+import org.eclipse.jetty.client.api.Connection;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.Pool;
+import org.eclipse.jetty.util.annotation.ManagedObject;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+
+/**
+ * A {@link MultiplexConnectionPool} that picks connections at a particular
+ * index between {@code 0} and {@link #getMaxConnectionCount()}.
+ * The algorithm that decides the index value is decided by subclasses.
+ * To acquire a connection, this class obtains the index value and attempts
+ * to activate the pool entry at that index.
+ * If this activation fails, another attempt to activate an alternative pool
+ * entry is performed, to avoid stalling connection acquisition if there is
+ * an available entry at a different index.
+ */
+@ManagedObject
+public abstract class IndexedConnectionPool extends MultiplexConnectionPool
+{
+ private static final Logger LOG = Log.getLogger(IndexedConnectionPool.class);
+
+ private final Pool pool;
+
+ public IndexedConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
+ {
+ super(destination, maxConnections, cache, requester, maxMultiplex);
+ pool = destination.getBean(Pool.class);
+ }
+
+ /**
+ * Must return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
+ * used to attempt to acquire the connection at that index in the pool.
+ *
+ * @param maxConnections the upper bound of the index (exclusive)
+ * @return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
+ */
+ protected abstract int getIndex(int maxConnections);
+
+ @Override
+ protected Connection activate()
+ {
+ int index = getIndex(getMaxConnectionCount());
+ Pool.Entry entry = pool.acquireAt(index);
+ if (LOG.isDebugEnabled())
+ LOG.debug("activating at index={} entry={}", index, entry);
+ if (entry == null)
+ {
+ entry = pool.acquire();
+ if (LOG.isDebugEnabled())
+ LOG.debug("activating alternative entry={}", entry);
+ }
+ if (entry == null)
+ return null;
+ Connection connection = entry.getPooled();
+ acquired(connection);
+ return connection;
+ }
+}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java
new file mode 100644
index 000000000000..10bf4e23727e
--- /dev/null
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java
@@ -0,0 +1,43 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.client;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.annotation.ManagedObject;
+
+/**
+ * An indexed {@link ConnectionPool} that provides connections
+ * randomly among the ones that are available.
+ */
+@ManagedObject
+public class RandomConnectionPool extends IndexedConnectionPool
+{
+ public RandomConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
+ {
+ super(destination, maxConnections, cache, requester, maxMultiplex);
+ }
+
+ @Override
+ protected int getIndex(int maxConnections)
+ {
+ return ThreadLocalRandom.current().nextInt(maxConnections);
+ }
+}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java
index 7b909f01d10a..0199c2a201d9 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java
@@ -18,22 +18,34 @@
package org.eclipse.jetty.client;
-import org.eclipse.jetty.client.api.Connection;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
-import org.eclipse.jetty.util.log.Log;
-import org.eclipse.jetty.util.log.Logger;
-import org.eclipse.jetty.util.thread.Locker;
+/**
+ * A {@link ConnectionPool} that attempts to provide connections using a round-robin algorithm.
+ * The round-robin behavior is almost impossible to achieve for several reasons:
+ *
+ * - the server takes different times to serve different requests; if a request takes a long
+ * time to be processed by the server, it would be a performance penalty to stall sending requests
+ * waiting for that connection to be available - better skip it and try another connection
+ * - connections may be closed by the client or by the server, so it should be a performance
+ * penalty to stall sending requests waiting for a new connection to be opened
+ * - thread scheduling on both client and server may temporarily penalize a connection
+ *
+ * Do not expect this class to provide connections in a perfect recurring sequence such as
+ * {@code c0, c1, ..., cN-1, c0, c1, ..., cN-1, c0, c1, ...} because that is impossible to
+ * achieve in a real environment.
+ * This class will just attempt a best-effort to provide the connections in a sequential order,
+ * but most likely the order will be quasi-random.
+ *
+ * @see RandomConnectionPool
+ */
@ManagedObject
-public class RoundRobinConnectionPool extends MultiplexConnectionPool
+public class RoundRobinConnectionPool extends IndexedConnectionPool
{
- private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class);
-
- private final Locker lock = new Locker();
- private final Pool pool;
- private int offset;
+ private final AtomicInteger offset = new AtomicInteger();
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
@@ -43,36 +55,16 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, false, requester, maxMultiplex);
- pool = destination.getBean(Pool.class);
- }
-
- @Override
- protected Connection acquire(boolean create)
- {
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
// those that were closed to process queued requests.
- return super.acquire(true);
+ setMaximizeConnections(true);
}
@Override
- protected Connection activate()
+ protected int getIndex(int maxConnections)
{
- Pool.Entry entry;
- try (Locker.Lock l = lock.lock())
- {
- int index = Math.abs(offset % pool.getMaxEntries());
- entry = pool.acquireAt(index);
- if (LOG.isDebugEnabled())
- LOG.debug("activated at index={} entry={}", index, entry);
- if (entry != null)
- ++offset;
- }
- if (entry == null)
- return null;
- Connection connection = entry.getPooled();
- acquired(connection);
- return connection;
+ return Math.abs(offset.getAndIncrement() % maxConnections);
}
}
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java
index 334b9b6a6d6e..1dadb6b2ef4b 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java
@@ -71,7 +71,8 @@ public static Stream poolsNoRoundRobin()
{
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
- new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
+ new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
+ new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), true, destination, 1))
);
}
diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java
index ad63a8e2d292..ad6d040bb8b8 100644
--- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java
+++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java
@@ -42,6 +42,7 @@
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -194,9 +195,9 @@ public void testMultiplexWithMaxUsage(Transport transport) throws Exception
multiplex = 2;
int maxMultiplex = multiplex;
- int maxUsage = 2;
- int maxConnections = 2;
- int count = maxConnections * maxMultiplex * maxUsage;
+ int maxUsage = 3;
+ int maxConnections = 4;
+ int count = 2 * maxConnections * maxMultiplex * maxUsage;
List remotePorts = new CopyOnWriteArrayList<>();
scenario.start(new EmptyServerHandler()
@@ -229,9 +230,14 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
assertTrue(clientLatch.await(count, TimeUnit.SECONDS));
assertEquals(count, remotePorts.size());
+ // Maps {remote_port -> number_of_times_port_was_used}.
Map results = remotePorts.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
- assertEquals(count / maxUsage, results.size(), remotePorts.toString());
- assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString());
+ // RoundRobinConnectionPool may open more connections than expected.
+ // For example with maxUsage=2, requests could be sent to these ports:
+ // [p1, p2, p3 | p1, p2, p3 | p4, p4, p5 | p6, p5, p7]
+ // Opening p5 and p6 was delayed, so the opening of p7 was triggered
+ // to replace p4 while p5 and p6 were busy sending their requests.
+ assertThat(remotePorts.toString(), count / maxUsage, lessThanOrEqualTo(results.size()));
}
}
From d20cea6c94fd62df9b672fe91c522bb5c9d09910 Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Tue, 1 Sep 2020 19:14:01 +0200
Subject: [PATCH 2/2] Fixes #5217 - Review RoundRobinConnectionPool
Updates after review.
Fixed broken tests.
Signed-off-by: Simone Bordet
---
.../jetty/client/IndexedConnectionPool.java | 4 +-
.../jetty/client/RandomConnectionPool.java | 4 +-
.../client/RoundRobinConnectionPool.java | 9 ++-
.../jetty/client/ConnectionPoolTest.java | 2 +-
.../client/RoundRobinConnectionPoolTest.java | 57 +++++++++++++------
5 files changed, 51 insertions(+), 25 deletions(-)
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java
index caf71b2621cf..587a79345856 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java
@@ -42,9 +42,9 @@ public abstract class IndexedConnectionPool extends MultiplexConnectionPool
private final Pool pool;
- public IndexedConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
+ public IndexedConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
- super(destination, maxConnections, cache, requester, maxMultiplex);
+ super(destination, maxConnections, false, requester, maxMultiplex);
pool = destination.getBean(Pool.class);
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java
index 10bf4e23727e..6eb44042a900 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java
@@ -30,9 +30,9 @@
@ManagedObject
public class RandomConnectionPool extends IndexedConnectionPool
{
- public RandomConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
+ public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
- super(destination, maxConnections, cache, requester, maxMultiplex);
+ super(destination, maxConnections, requester, maxMultiplex);
}
@Override
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java
index 0199c2a201d9..d9cb2c590e57 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java
@@ -39,6 +39,11 @@
* achieve in a real environment.
* This class will just attempt a best-effort to provide the connections in a sequential order,
* but most likely the order will be quasi-random.
+ * Applications using this class should {@link #preCreateConnections(int) pre-create}
+ * the connections to ensure that they are already opened when the application starts to requests
+ * them, otherwise the first connection that is opened may be used multiple times before the others
+ * are opened, resulting in a behavior that is more random-like than more round-robin-like (and
+ * that confirms that round-robin behavior is almost impossible to achieve).
*
* @see RandomConnectionPool
*/
@@ -54,7 +59,7 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
- super(destination, maxConnections, false, requester, maxMultiplex);
+ super(destination, maxConnections, requester, maxMultiplex);
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
@@ -65,6 +70,6 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,
@Override
protected int getIndex(int maxConnections)
{
- return Math.abs(offset.getAndIncrement() % maxConnections);
+ return offset.getAndUpdate(v -> ++v == maxConnections ? 0 : v);
}
}
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java
index 1dadb6b2ef4b..0a679a1b155e 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java
@@ -72,7 +72,7 @@ public static Stream poolsNoRoundRobin()
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
- new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), true, destination, 1))
+ new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
);
}
diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java
index ad6d040bb8b8..5df851017e5f 100644
--- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java
+++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@@ -72,17 +73,21 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
});
int maxConnections = 3;
- scenario.client.getTransport().setConnectionPoolFactory(destination -> new RoundRobinConnectionPool(destination, maxConnections, destination));
-
- // Prime the connections, so that they are all opened
- // before we actually test the round robin behavior.
- for (int i = 0; i < maxConnections; ++i)
+ CompletableFuture setup = new CompletableFuture<>();
+ scenario.client.getTransport().setConnectionPoolFactory(destination ->
{
- ContentResponse response = scenario.client.newRequest(scenario.newURI())
- .timeout(5, TimeUnit.SECONDS)
- .send();
- assertEquals(HttpStatus.OK_200, response.getStatus());
- }
+ RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination);
+ pool.preCreateConnections(maxConnections).handle((r, x) -> x != null ? setup.completeExceptionally(x) : setup.complete(null));
+ return pool;
+ });
+
+ // Send one request to trigger destination creation
+ // and connection pool pre-creation of connections,
+ // so we can test reliably the round-robin behavior.
+ scenario.client.newRequest(scenario.newURI())
+ .timeout(5, TimeUnit.SECONDS)
+ .send();
+ setup.get(5, TimeUnit.SECONDS);
record.set(true);
int requests = 2 * maxConnections - 1;
@@ -119,6 +124,7 @@ public void testMultiplex(Transport transport) throws Exception
int maxConnections = 3;
int count = maxConnections * maxMultiplex;
+ AtomicBoolean record = new AtomicBoolean();
List remotePorts = new CopyOnWriteArrayList<>();
AtomicReference requestLatch = new AtomicReference<>();
CountDownLatch serverLatch = new CountDownLatch(count);
@@ -130,10 +136,13 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
{
try
{
- remotePorts.add(request.getRemotePort());
- requestLatch.get().countDown();
- serverLatch.countDown();
- barrier.await();
+ if (record.get())
+ {
+ remotePorts.add(request.getRemotePort());
+ requestLatch.get().countDown();
+ serverLatch.countDown();
+ barrier.await();
+ }
}
catch (Exception x)
{
@@ -142,11 +151,23 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
}
});
- scenario.client.getTransport().setConnectionPoolFactory(destination -> new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex));
+ CompletableFuture setup = new CompletableFuture<>();
+ scenario.client.getTransport().setConnectionPoolFactory(destination ->
+ {
+ RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination);
+ pool.preCreateConnections(maxConnections).handle((r, x) -> x != null ? setup.completeExceptionally(x) : setup.complete(null));
+ return pool;
+ });
- // Do not prime the connections, to see if the behavior is
- // correct even if the connections are not pre-created.
+ // Send one request to trigger destination creation
+ // and connection pool pre-creation of connections,
+ // so we can test reliably the round-robin behavior.
+ scenario.client.newRequest(scenario.newURI())
+ .timeout(5, TimeUnit.SECONDS)
+ .send();
+ setup.get(5, TimeUnit.SECONDS);
+ record.set(true);
CountDownLatch clientLatch = new CountDownLatch(count);
AtomicInteger requests = new AtomicInteger();
for (int i = 0; i < count; ++i)
@@ -172,7 +193,7 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
barrier.await();
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
- assertThat(remotePorts.size(), Matchers.equalTo(count));
+ assertThat(remotePorts.toString(), remotePorts.size(), Matchers.equalTo(count));
for (int i = 0; i < count; ++i)
{
int base = i % maxConnections;