Skip to content

Commit

Permalink
Enhanced fix for #5855
Browse files Browse the repository at this point in the history
revert to using destination queue size for demand
  • Loading branch information
gregw committed Jan 6, 2021
1 parent 482c943 commit aff470c
Showing 1 changed file with 28 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
Expand All @@ -43,15 +43,13 @@
import org.eclipse.jetty.util.thread.Sweeper;

import static java.util.stream.Collectors.toCollection;
import static org.eclipse.jetty.util.AtomicBiInteger.getHi;
import static org.eclipse.jetty.util.AtomicBiInteger.getLo;

@ManagedObject
public abstract class AbstractConnectionPool extends ContainerLifeCycle implements ConnectionPool, Dumpable, Sweeper.Sweepable
{
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);

private final AtomicBiInteger reservations = new AtomicBiInteger(); // hi==pending; lo==demand
private final AtomicInteger pending = new AtomicInteger();
private final HttpDestination destination;
private final Callback requester;
private final Pool<Connection> pool;
Expand Down Expand Up @@ -97,7 +95,7 @@ public CompletableFuture<Void> preCreateConnections(int connectionCount)
Pool<Connection>.Entry entry = pool.reserve();
if (entry == null)
break;
reservations.add(1, 0);
pending.addAndGet(1);

Promise.Completable<Connection> future = new FutureConnection(entry);
futures.add(future);
Expand Down Expand Up @@ -223,11 +221,11 @@ public Connection acquire()
protected Connection acquire(boolean create)
{
if (LOG.isDebugEnabled())
LOG.debug("Acquiring create={} on {}", create, this);
LOG.debug("Acquiring on {}", this);
Connection connection = activate();
if (connection == null)
{
tryCreate(create || isMaximizeConnections());
tryCreate(create);
connection = activate();
}
return connection;
Expand All @@ -246,54 +244,39 @@ protected Connection acquire(boolean create)
* reduced by the {@link #getMaxMultiplex()} factor.</li>
* </ul>
*/
protected void tryCreate(boolean demanded)
protected void tryCreate(boolean create)
{
int connectionCount = getConnectionCount();
if (LOG.isDebugEnabled())
LOG.debug("Try creating connection {}/{} with {} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount());

// If we have already pending sufficient multiplexed connections, then do not create another
int multiplexed = getMaxMultiplex();
Pool<Connection>.Entry entry = null;
while (true)
{
long encoded = reservations.get();
int pending = getHi(encoded);
int demand = getLo(encoded);

if (demanded)
demand++;
int pending = this.pending.get();
int supply = pending * multiplexed;
int demand = destination.getQueuedRequestCount() + (create ? 1 : 0);

// Do we need a new connections?
if (supply >= demand)
{
if (!reservations.compareAndSet(encoded, pending, demand))
continue;
if (entry != null)
entry.release();
if (!isMaximizeConnections() && supply >= demand)
return;
}

// Try to reserve an entry to create
if (entry == null)
entry = pool.reserve();
if (entry == null)
{
if (!reservations.compareAndSet(encoded, pending, demand))
continue;
return;
}
if (this.pending.compareAndSet(pending, pending + 1))
break;
}

if (!reservations.compareAndSet(encoded, pending + 1, demand))
continue;
// Create the connection
if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry);
Promise<Connection> future = new FutureConnection(entry);
destination.newConnection(future);
// Create the connection
Pool<Connection>.Entry entry = pool.reserve();
if (entry == null)
{
pending.decrementAndGet();
return;
}

if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry);
Promise<Connection> future = new FutureConnection(entry);
destination.newConnection(future);
}

protected void proceed()
Expand Down Expand Up @@ -466,14 +449,16 @@ public boolean sweep()
@Override
public String toString()
{
return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d]",
return String.format("%s@%x[c=%d/%d/%d/%d,a=%d,i=%d,q=%d]",
getClass().getSimpleName(),
hashCode(),
pending.get(),
getPendingConnectionCount(),
getConnectionCount(),
getMaxConnectionCount(),
getActiveConnectionCount(),
getIdleConnectionCount());
getIdleConnectionCount(),
destination.getQueuedRequestCount());
}

private class FutureConnection extends Promise.Completable<Connection>
Expand All @@ -490,12 +475,12 @@ public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection creation succeeded {}: {}", reserved, connection);
pending.decrementAndGet();
if (connection instanceof Attachable)
{
((Attachable)connection).setAttachment(reserved);
onCreated(connection);

reservations.updateAndGet(encoded -> AtomicBiInteger.encode(getHi(encoded) - 1, Math.max(0, getLo(encoded) - getMaxMultiplex())));
reserved.enable(connection, false);
idle(connection, false);
complete(null);
Expand All @@ -504,7 +489,6 @@ public void succeeded(Connection connection)
else
{
// reduce pending on failure and if not multiplexing also reduce demand
reservations.add(-1, getMaxMultiplex() == 1 ? -1 : 0);
failed(new IllegalArgumentException("Invalid connection object: " + connection));
}
}
Expand All @@ -515,7 +499,7 @@ public void failed(Throwable x)
if (LOG.isDebugEnabled())
LOG.debug("Connection creation failed {}", reserved, x);
// reduce pending on failure and if not multiplexing also reduce demand
reservations.add(-1, getMaxMultiplex() == 1 ? -1 : 0);
pending.decrementAndGet();
reserved.remove();
completeExceptionally(x);
requester.failed(x);
Expand Down

0 comments on commit aff470c

Please sign in to comment.