diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 6ec1ee474254..4832590f7cb2 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -48,6 +48,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable private final HttpDestination destination; private final Callback requester; private final Pool pool; + private boolean maximizeConnections; /** * @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead @@ -151,11 +152,28 @@ public boolean isEmpty() } @Override + @ManagedAttribute("Whether this pool is closed") public boolean isClosed() { return pool.isClosed(); } + @ManagedAttribute("Whether the pool tries to maximize the number of connections used") + public boolean isMaximizeConnections() + { + return maximizeConnections; + } + + /** + *

Sets whether the number of connections should be maximized.

+ * + * @param maximizeConnections whether the number of connections should be maximized + */ + public void setMaximizeConnections(boolean maximizeConnections) + { + this.maximizeConnections = maximizeConnections; + } + @Override public Connection acquire() { @@ -164,7 +182,8 @@ public Connection acquire() /** *

Returns an idle connection, if available; - * if an idle connection is not available, and the given {@code create} parameter is {@code true}, + * if an idle connection is not available, and the given {@code create} parameter is {@code true} + * or {@link #isMaximizeConnections()} is {@code true}, * then schedules the opening of a new connection, if possible within the configuration of this * connection pool (for example, if it does not exceed the max connection count); * otherwise returns {@code null}.

@@ -178,7 +197,7 @@ protected Connection acquire(boolean create) if (LOG.isDebugEnabled()) LOG.debug("Acquiring create={} on {}", create, this); Connection connection = activate(); - if (connection == null && create) + if (connection == null && (create || isMaximizeConnections())) { tryCreate(destination.getQueuedRequestCount()); connection = activate(); @@ -357,8 +376,8 @@ protected void removed(Connection connection) } /** - * @deprecated Relying on this method indicates a reliance on the implementation details. * @return an unmodifiable queue working as a view of the idle connections. + * @deprecated Relying on this method indicates a reliance on the implementation details. */ @Deprecated public Queue getIdleConnections() @@ -371,8 +390,8 @@ public Queue getIdleConnections() } /** - * @deprecated Relying on this method indicates a reliance on the implementation details. * @return an unmodifiable collection working as a view of the active connections. + * @deprecated Relying on this method indicates a reliance on the implementation details. */ @Deprecated public Collection getActiveConnections() diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java new file mode 100644 index 000000000000..caf71b2621cf --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java @@ -0,0 +1,79 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Pool; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +/** + *

A {@link MultiplexConnectionPool} that picks connections at a particular + * index between {@code 0} and {@link #getMaxConnectionCount()}.

+ *

The algorithm that decides the index value is decided by subclasses.

+ *

To acquire a connection, this class obtains the index value and attempts + * to activate the pool entry at that index. + * If this activation fails, another attempt to activate an alternative pool + * entry is performed, to avoid stalling connection acquisition if there is + * an available entry at a different index.

+ */ +@ManagedObject +public abstract class IndexedConnectionPool extends MultiplexConnectionPool +{ + private static final Logger LOG = Log.getLogger(IndexedConnectionPool.class); + + private final Pool pool; + + public IndexedConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex) + { + super(destination, maxConnections, cache, requester, maxMultiplex); + pool = destination.getBean(Pool.class); + } + + /** + *

Must return an index between 0 (inclusive) and {@code maxConnections} (exclusive) + * used to attempt to acquire the connection at that index in the pool.

+ * + * @param maxConnections the upper bound of the index (exclusive) + * @return an index between 0 (inclusive) and {@code maxConnections} (exclusive) + */ + protected abstract int getIndex(int maxConnections); + + @Override + protected Connection activate() + { + int index = getIndex(getMaxConnectionCount()); + Pool.Entry entry = pool.acquireAt(index); + if (LOG.isDebugEnabled()) + LOG.debug("activating at index={} entry={}", index, entry); + if (entry == null) + { + entry = pool.acquire(); + if (LOG.isDebugEnabled()) + LOG.debug("activating alternative entry={}", entry); + } + if (entry == null) + return null; + Connection connection = entry.getPooled(); + acquired(connection); + return connection; + } +} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java new file mode 100644 index 000000000000..10bf4e23727e --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java @@ -0,0 +1,43 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.util.concurrent.ThreadLocalRandom; + +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.annotation.ManagedObject; + +/** + *

An indexed {@link ConnectionPool} that provides connections + * randomly among the ones that are available.

+ */ +@ManagedObject +public class RandomConnectionPool extends IndexedConnectionPool +{ + public RandomConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex) + { + super(destination, maxConnections, cache, requester, maxMultiplex); + } + + @Override + protected int getIndex(int maxConnections) + { + return ThreadLocalRandom.current().nextInt(maxConnections); + } +} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java index 7b909f01d10a..0199c2a201d9 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java @@ -18,22 +18,34 @@ package org.eclipse.jetty.client; -import org.eclipse.jetty.client.api.Connection; +import java.util.concurrent.atomic.AtomicInteger; + import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.annotation.ManagedObject; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.Locker; +/** + *

A {@link ConnectionPool} that attempts to provide connections using a round-robin algorithm.

+ *

The round-robin behavior is almost impossible to achieve for several reasons:

+ *
    + *
  • the server takes different times to serve different requests; if a request takes a long + * time to be processed by the server, it would be a performance penalty to stall sending requests + * waiting for that connection to be available - better skip it and try another connection
  • + *
  • connections may be closed by the client or by the server, so it should be a performance + * penalty to stall sending requests waiting for a new connection to be opened
  • + *
  • thread scheduling on both client and server may temporarily penalize a connection
  • + *
+ *

Do not expect this class to provide connections in a perfect recurring sequence such as + * {@code c0, c1, ..., cN-1, c0, c1, ..., cN-1, c0, c1, ...} because that is impossible to + * achieve in a real environment. + * This class will just attempt a best-effort to provide the connections in a sequential order, + * but most likely the order will be quasi-random.

+ * + * @see RandomConnectionPool + */ @ManagedObject -public class RoundRobinConnectionPool extends MultiplexConnectionPool +public class RoundRobinConnectionPool extends IndexedConnectionPool { - private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class); - - private final Locker lock = new Locker(); - private final Pool pool; - private int offset; + private final AtomicInteger offset = new AtomicInteger(); public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester) { @@ -43,36 +55,16 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { super(destination, maxConnections, false, requester, maxMultiplex); - pool = destination.getBean(Pool.class); - } - - @Override - protected Connection acquire(boolean create) - { // If there are queued requests and connections get // closed due to idle timeout or overuse, we want to // aggressively try to open new connections to replace // those that were closed to process queued requests. - return super.acquire(true); + setMaximizeConnections(true); } @Override - protected Connection activate() + protected int getIndex(int maxConnections) { - Pool.Entry entry; - try (Locker.Lock l = lock.lock()) - { - int index = Math.abs(offset % pool.getMaxEntries()); - entry = pool.acquireAt(index); - if (LOG.isDebugEnabled()) - LOG.debug("activated at index={} entry={}", index, entry); - if (entry != null) - ++offset; - } - if (entry == null) - return null; - Connection connection = entry.getPooled(); - acquired(connection); - return connection; + return Math.abs(offset.getAndIncrement() % maxConnections); } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java index 334b9b6a6d6e..1dadb6b2ef4b 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java @@ -71,7 +71,8 @@ public static Stream poolsNoRoundRobin() { return Stream.of( new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)), - new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)) + new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)), + new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), true, destination, 1)) ); } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java index ad63a8e2d292..ad6d040bb8b8 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java @@ -42,6 +42,7 @@ import org.junit.jupiter.params.provider.ArgumentsSource; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -194,9 +195,9 @@ public void testMultiplexWithMaxUsage(Transport transport) throws Exception multiplex = 2; int maxMultiplex = multiplex; - int maxUsage = 2; - int maxConnections = 2; - int count = maxConnections * maxMultiplex * maxUsage; + int maxUsage = 3; + int maxConnections = 4; + int count = 2 * maxConnections * maxMultiplex * maxUsage; List remotePorts = new CopyOnWriteArrayList<>(); scenario.start(new EmptyServerHandler() @@ -229,9 +230,14 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r assertTrue(clientLatch.await(count, TimeUnit.SECONDS)); assertEquals(count, remotePorts.size()); + // Maps {remote_port -> number_of_times_port_was_used}. Map results = remotePorts.stream() .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); - assertEquals(count / maxUsage, results.size(), remotePorts.toString()); - assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString()); + // RoundRobinConnectionPool may open more connections than expected. + // For example with maxUsage=2, requests could be sent to these ports: + // [p1, p2, p3 | p1, p2, p3 | p4, p4, p5 | p6, p5, p7] + // Opening p5 and p6 was delayed, so the opening of p7 was triggered + // to replace p4 while p5 and p6 were busy sending their requests. + assertThat(remotePorts.toString(), count / maxUsage, lessThanOrEqualTo(results.size())); } }