Skip to content

Commit

Permalink
Enhanced fix for #5855
Browse files Browse the repository at this point in the history
moved pending from Pool to AbstractConnectionPool
  • Loading branch information
gregw committed Jan 6, 2021
1 parent d226896 commit 950b11a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
Expand All @@ -42,12 +43,15 @@
import org.eclipse.jetty.util.thread.Sweeper;

import static java.util.stream.Collectors.toCollection;
import static org.eclipse.jetty.util.AtomicBiInteger.getHi;
import static org.eclipse.jetty.util.AtomicBiInteger.getLo;

@ManagedObject
public abstract class AbstractConnectionPool extends ContainerLifeCycle implements ConnectionPool, Dumpable, Sweeper.Sweepable
{
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);

private final AtomicBiInteger pending = new AtomicBiInteger(); // hi==reserved; lo==demand
private final HttpDestination destination;
private final Callback requester;
private final Pool<Connection> pool;
Expand Down Expand Up @@ -88,24 +92,17 @@ public CompletableFuture<Void> preCreateConnections(int connectionCount)
LOG.debug("Precreating connections {}/{}", connectionCount, getMaxConnectionCount());

List<CompletableFuture<?>> futures = new ArrayList<>();
loop : for (int i = 0; i < connectionCount; i++)
for (int i = 0; i < connectionCount; i++)
{
Pool<Connection>.Entry entry = pool.reserve();
while (entry == null)
{
if (pool.size() >= pool.getMaxEntries())
break loop;
if (pool.getMaxMultiplex() <= 1)
throw new IllegalStateException();
entry = pool.reserve();
}

final Pool<Connection>.Entry reserved = entry;
if (entry == null)
break;
pending.addAndGetHi(1);

Promise.Completable<Connection> future = new FutureConnection(reserved);
Promise.Completable<Connection> future = new FutureConnection(entry);
futures.add(future);
if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{} at {}", futures.size() + 1, getMaxConnectionCount(), reserved);
LOG.debug("Creating connection {}/{} at {}", futures.size() + 1, getMaxConnectionCount(), entry);
destination.newConnection(future);
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
Expand Down Expand Up @@ -252,9 +249,30 @@ protected void tryCreate()
if (LOG.isDebugEnabled())
LOG.debug("Try creating connection {}/{} with {} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount());

// If we have already reserved sufficient multiplexed connections, then do not create another
int multiplexed = getMaxMultiplex();
while (true)
{
long encoded = pending.get();
int reserved = getHi(encoded);
int demand = getLo(encoded);

// If we have already reserved enough connections, just increment demand and return
if (reserved * multiplexed > demand && (pending.compareAndSet(encoded, reserved, demand + 1)))
return;

// otherwise increase reservations and demand
if (pending.compareAndSet(encoded, reserved + 1, demand + 1))
break;
}

Pool<Connection>.Entry entry = pool.reserve();
if (entry == null)
{
// pool is full, so decrement reservations and return
pending.addAndGetHi(-1);
return;
}

if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry);
Expand Down Expand Up @@ -461,13 +479,21 @@ public void succeeded(Connection connection)
{
((Attachable)connection).setAttachment(reserved);
onCreated(connection);

while (true)
{
long encoded = pending.get();
if (pending.compareAndSet(encoded, getHi(encoded) - 1, Math.max(0, getLo(encoded) - getMaxMultiplex())))
break;
}
reserved.enable(connection, false);
idle(connection, false);
complete(null);
proceed();
}
else
{
pending.addAndGetHi(-1);
failed(new IllegalArgumentException("Invalid connection object: " + connection));
}
}
Expand All @@ -477,6 +503,7 @@ public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection creation failed {}", reserved, x);
pending.addAndGetHi(-1);
reserved.remove();
completeExceptionally(x);
requester.failed(x);
Expand Down
65 changes: 13 additions & 52 deletions jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public class Pool<T> implements AutoCloseable, Dumpable
private final List<Entry> entries = new CopyOnWriteArrayList<>();

private final int maxEntries;
private final AtomicBiInteger pending = new AtomicBiInteger(); // Lo reserved; Hi demand
private final StrategyType strategyType;

/*
Expand Down Expand Up @@ -137,12 +136,7 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache)

public int getReservedCount()
{
return pending.getLo();
}

public int getDemand()
{
return pending.getHi();
return (int)entries.stream().filter(Entry::isReserved).count();
}

public int getIdleCount()
Expand Down Expand Up @@ -235,12 +229,9 @@ public Entry reserve(int allotment)
if (space <= 0)
return null;

// The pending count is an AtomicInteger that is only ever incremented here with
// the lock held. Thus the pending count can be reduced immediately after the
// test below, but never incremented. Thus the allotment limit can be enforced.
if (allotment >= 0 && (pending.getLo() * getMaxMultiplex()) >= allotment)
long pending = entries.stream().filter(Entry::isReserved).count();
if (allotment >= 0 && (pending * getMaxMultiplex()) >= allotment)
return null;
pending.addAndGetLo(1);

Entry entry = new Entry();
entries.add(entry);
Expand Down Expand Up @@ -268,27 +259,9 @@ public Entry reserve()
if (closed)
return null;

// Loop to update atomic pending as other mutators do not use lock.
while (true)
{
long encoded = pending.get();
int reserved = AtomicBiInteger.getLo(encoded);
int demand = AtomicBiInteger.getHi(encoded);

// If we have space and there is demand for a new entry
if (entries.size() < maxEntries && reserved * getMaxMultiplex() <= demand)
{
// create a new entry and increment demand
if (pending.compareAndSet(encoded, demand + 1, reserved + 1))
break;
}
else
{
// We either can't create or don't need a new entry, so only increment demand
if (pending.compareAndSet(encoded, demand + 1, reserved))
return null;
}
}
// If we have no space
if (entries.size() >= maxEntries)
return null;

Entry entry = new Entry();
entries.add(entry);
Expand Down Expand Up @@ -512,14 +485,11 @@ public void dump(Appendable out, String indent) throws IOException
@Override
public String toString()
{
long encoded = pending.get();
return String.format("%s@%x[size=%d closed=%s reserved=%d, demand=%d]",
return String.format("%s@%x[size=%d closed=%s]",
getClass().getSimpleName(),
hashCode(),
entries.size(),
closed,
AtomicBiInteger.getLo(encoded),
AtomicBiInteger.getHi(encoded));
closed);
}

public class Entry
Expand Down Expand Up @@ -575,16 +545,6 @@ public boolean enable(T pooled, boolean acquire)
throw new IllegalStateException("Entry already enabled: " + this);
}

while (true)
{
long encoded = pending.get();
int reserved = AtomicBiInteger.getLo(encoded);
int demand = AtomicBiInteger.getHi(encoded);
reserved = reserved - 1;
demand = Math.max(0, demand - getMaxMultiplex());
if (pending.compareAndSet(encoded, demand, reserved))
break;
}
return true;
}

Expand Down Expand Up @@ -685,11 +645,7 @@ boolean tryRemove()

boolean removed = state.compareAndSet(usageCount, -1, multiplexCount, newMultiplexCount);
if (removed)
{
if (usageCount == Integer.MIN_VALUE)
pending.addAndGetLo(-1);
return newMultiplexCount == 0;
}
}
}

Expand All @@ -698,6 +654,11 @@ public boolean isClosed()
return state.getHi() < 0;
}

public boolean isReserved()
{
return state.getHi() == Integer.MIN_VALUE;
}

public boolean isIdle()
{
long encoded = state.get();
Expand Down
14 changes: 0 additions & 14 deletions jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,55 +159,41 @@ public void testReserve(Factory factory)
Pool<String>.Entry e1 = pool.reserve();
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getDemand(), is(1));
assertThat(pool.getIdleCount(), is(0));
assertThat(pool.getInUseCount(), is(0));

// max reservations
assertNull(pool.reserve());
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getDemand(), is(2));
assertThat(pool.getIdleCount(), is(0));
assertThat(pool.getInUseCount(), is(0));

// enable the entry
e1.enable("aaa", false);
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getDemand(), is(0));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));

// Reserve another entry
Pool<String>.Entry e2 = pool.reserve();
assertThat(pool.size(), is(2));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getDemand(), is(1));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));

// remove the reservation
e2.remove();
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getDemand(), is(1));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));

// Reserve another entry
Pool<String>.Entry e3 = pool.reserve();
assertThat(pool.size(), is(2));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getDemand(), is(2));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));

// enable and acquire the entry
e3.enable("bbb", true);
assertThat(pool.size(), is(2));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getDemand(), is(0));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(1));

Expand Down

0 comments on commit 950b11a

Please sign in to comment.