From aff470c770a7f47029400b488e4c8abaaadbb63f Mon Sep 17 00:00:00 2001 From: gregw Date: Wed, 6 Jan 2021 23:06:20 +0100 Subject: [PATCH] Enhanced fix for #5855 revert to using destination queue size for demand --- .../jetty/client/AbstractConnectionPool.java | 72 ++++++++----------- 1 file changed, 28 insertions(+), 44 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 9f97ece9e1c4..1c6937b0d4f9 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -25,11 +25,11 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Destination; -import org.eclipse.jetty.util.AtomicBiInteger; import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Pool; @@ -43,15 +43,13 @@ import org.eclipse.jetty.util.thread.Sweeper; import static java.util.stream.Collectors.toCollection; -import static org.eclipse.jetty.util.AtomicBiInteger.getHi; -import static org.eclipse.jetty.util.AtomicBiInteger.getLo; @ManagedObject public abstract class AbstractConnectionPool extends ContainerLifeCycle implements ConnectionPool, Dumpable, Sweeper.Sweepable { private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class); - private final AtomicBiInteger reservations = new AtomicBiInteger(); // hi==pending; lo==demand + private final AtomicInteger pending = new AtomicInteger(); private final HttpDestination destination; private final Callback requester; private final Pool pool; @@ -97,7 +95,7 @@ public CompletableFuture preCreateConnections(int connectionCount) Pool.Entry entry = pool.reserve(); if (entry == null) break; - reservations.add(1, 0); + pending.addAndGet(1); Promise.Completable future = new FutureConnection(entry); futures.add(future); @@ -223,11 +221,11 @@ public Connection acquire() protected Connection acquire(boolean create) { if (LOG.isDebugEnabled()) - LOG.debug("Acquiring create={} on {}", create, this); + LOG.debug("Acquiring on {}", this); Connection connection = activate(); if (connection == null) { - tryCreate(create || isMaximizeConnections()); + tryCreate(create); connection = activate(); } return connection; @@ -246,7 +244,7 @@ protected Connection acquire(boolean create) * reduced by the {@link #getMaxMultiplex()} factor. * */ - protected void tryCreate(boolean demanded) + protected void tryCreate(boolean create) { int connectionCount = getConnectionCount(); if (LOG.isDebugEnabled()) @@ -254,46 +252,31 @@ protected void tryCreate(boolean demanded) // If we have already pending sufficient multiplexed connections, then do not create another int multiplexed = getMaxMultiplex(); - Pool.Entry entry = null; while (true) { - long encoded = reservations.get(); - int pending = getHi(encoded); - int demand = getLo(encoded); - - if (demanded) - demand++; + int pending = this.pending.get(); int supply = pending * multiplexed; + int demand = destination.getQueuedRequestCount() + (create ? 1 : 0); - // Do we need a new connections? - if (supply >= demand) - { - if (!reservations.compareAndSet(encoded, pending, demand)) - continue; - if (entry != null) - entry.release(); + if (!isMaximizeConnections() && supply >= demand) return; - } - // Try to reserve an entry to create - if (entry == null) - entry = pool.reserve(); - if (entry == null) - { - if (!reservations.compareAndSet(encoded, pending, demand)) - continue; - return; - } + if (this.pending.compareAndSet(pending, pending + 1)) + break; + } - if (!reservations.compareAndSet(encoded, pending + 1, demand)) - continue; - // Create the connection - if (LOG.isDebugEnabled()) - LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry); - Promise future = new FutureConnection(entry); - destination.newConnection(future); + // Create the connection + Pool.Entry entry = pool.reserve(); + if (entry == null) + { + pending.decrementAndGet(); return; } + + if (LOG.isDebugEnabled()) + LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry); + Promise future = new FutureConnection(entry); + destination.newConnection(future); } protected void proceed() @@ -466,14 +449,16 @@ public boolean sweep() @Override public String toString() { - return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d]", + return String.format("%s@%x[c=%d/%d/%d/%d,a=%d,i=%d,q=%d]", getClass().getSimpleName(), hashCode(), + pending.get(), getPendingConnectionCount(), getConnectionCount(), getMaxConnectionCount(), getActiveConnectionCount(), - getIdleConnectionCount()); + getIdleConnectionCount(), + destination.getQueuedRequestCount()); } private class FutureConnection extends Promise.Completable @@ -490,12 +475,12 @@ public void succeeded(Connection connection) { if (LOG.isDebugEnabled()) LOG.debug("Connection creation succeeded {}: {}", reserved, connection); + pending.decrementAndGet(); if (connection instanceof Attachable) { ((Attachable)connection).setAttachment(reserved); onCreated(connection); - reservations.updateAndGet(encoded -> AtomicBiInteger.encode(getHi(encoded) - 1, Math.max(0, getLo(encoded) - getMaxMultiplex()))); reserved.enable(connection, false); idle(connection, false); complete(null); @@ -504,7 +489,6 @@ public void succeeded(Connection connection) else { // reduce pending on failure and if not multiplexing also reduce demand - reservations.add(-1, getMaxMultiplex() == 1 ? -1 : 0); failed(new IllegalArgumentException("Invalid connection object: " + connection)); } } @@ -515,7 +499,7 @@ public void failed(Throwable x) if (LOG.isDebugEnabled()) LOG.debug("Connection creation failed {}", reserved, x); // reduce pending on failure and if not multiplexing also reduce demand - reservations.add(-1, getMaxMultiplex() == 1 ? -1 : 0); + pending.decrementAndGet(); reserved.remove(); completeExceptionally(x); requester.failed(x);