Skip to content

Commit

Permalink
implement connection pool max duration
Browse files Browse the repository at this point in the history
  • Loading branch information
lorban committed Jan 7, 2021
1 parent 403d5ec commit e6ca0d9
Show file tree
Hide file tree
Showing 4 changed files with 364 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
Expand All @@ -54,6 +57,23 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
private final Callback requester;
private final Pool<Connection> pool;
private boolean maximizeConnections;
private volatile long maxDurationNanos = 0L;

private static class EntryHolder
{
private final Pool<Connection>.Entry entry;
private final long creationTimestamp = System.nanoTime();

private EntryHolder(Pool<Connection>.Entry entry)
{
this.entry = Objects.requireNonNull(entry);
}

private boolean isExpired(long timeoutNanos)
{
return System.nanoTime() - creationTimestamp >= timeoutNanos;
}
}

/**
* @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead
Expand Down Expand Up @@ -105,6 +125,22 @@ public CompletableFuture<Void> preCreateConnections(int connectionCount)
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

@ManagedAttribute(value = "The maximum duration in milliseconds a connection can be used for before it gets closed")
public long getMaxDuration()
{
return TimeUnit.NANOSECONDS.toMillis(maxDurationNanos);
}

/**
* Set the max usage duration of the pool's entries in milliseconds.
* Values {@code 0} and negative mean that there is no limit.
* @param maxDuration the maximum duration in milliseconds.
*/
public void setMaxDuration(long maxDuration)
{
this.maxDurationNanos = TimeUnit.MILLISECONDS.toNanos(maxDuration);
}

protected int getMaxMultiplex()
{
return pool.getMaxMultiplex();
Expand Down Expand Up @@ -290,16 +326,33 @@ protected void proceed()

protected Connection activate()
{
Pool<Connection>.Entry entry = pool.acquire();
if (entry != null)
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("Activated {} {}", entry, pool);
Connection connection = entry.getPooled();
acquired(connection);
return connection;
Pool<Connection>.Entry entry = pool.acquire();
if (entry != null)
{
Connection connection = entry.getPooled();

long maxDurationNanos = this.maxDurationNanos;
if (maxDurationNanos > 0L)
{
EntryHolder holder = (EntryHolder)((Attachable)connection).getAttachment();
if (holder != null && holder.isExpired(maxDurationNanos))
{
IO.close(connection);
if (LOG.isDebugEnabled())
LOG.debug("Connection closed due to expiration {} {}", entry, pool);
continue;
}
}

if (LOG.isDebugEnabled())
LOG.debug("Activated {} {}", entry, pool);
acquired(connection);
return connection;
}
return null;
}
return null;
}

@Override
Expand All @@ -308,11 +361,10 @@ public boolean isActive(Connection connection)
if (!(connection instanceof Attachable))
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
if (entry == null)
EntryHolder holder = (EntryHolder)attachable.getAttachment();
if (holder == null)
return false;
return !entry.isIdle();
return !holder.entry.isIdle();
}

@Override
Expand All @@ -329,13 +381,12 @@ protected boolean deactivate(Connection connection)
if (!(connection instanceof Attachable))
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
if (entry == null)
EntryHolder holder = (EntryHolder)attachable.getAttachment();
if (holder == null)
return true;
boolean reusable = pool.release(entry);
boolean reusable = pool.release(holder.entry);
if (LOG.isDebugEnabled())
LOG.debug("Released ({}) {} {}", reusable, entry, pool);
LOG.debug("Released ({}) {} {}", reusable, holder.entry, pool);
if (reusable)
return true;
remove(connection);
Expand All @@ -353,14 +404,14 @@ protected boolean remove(Connection connection, boolean force)
if (!(connection instanceof Attachable))
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
if (entry == null)
EntryHolder holder = (EntryHolder)attachable.getAttachment();
if (holder == null)
return false;
attachable.setAttachment(null);
boolean removed = pool.remove(entry);
boolean removed = pool.remove(holder.entry);
if (removed)
attachable.setAttachment(null);
if (LOG.isDebugEnabled())
LOG.debug("Removed ({}) {} {}", removed, entry, pool);
LOG.debug("Removed ({}) {} {}", removed, holder.entry, pool);
if (removed || force)
{
released(connection);
Expand Down Expand Up @@ -433,20 +484,22 @@ public void dump(Appendable out, String indent) throws IOException
@Override
public boolean sweep()
{
pool.values().stream().filter(entry -> entry.getPooled() instanceof Sweeper.Sweepable).forEach(entry ->
{
Connection connection = entry.getPooled();
if (((Sweeper.Sweepable)connection).sweep())
pool.values().stream()
.map(Pool.Entry::getPooled)
.filter(connection -> connection instanceof Sweeper.Sweepable)
.forEach(connection ->
{
boolean removed = remove(connection);
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
connection,
System.lineSeparator(),
removed ? "Removed" : "Not removed",
System.lineSeparator(),
dump());
}
});
if (((Sweeper.Sweepable)connection).sweep())
{
boolean removed = remove(connection);
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
connection,
System.lineSeparator(),
removed ? "Removed" : "Not removed",
System.lineSeparator(),
dump());
}
});
return false;
}

Expand Down Expand Up @@ -480,7 +533,7 @@ public void succeeded(Connection connection)
LOG.debug("Connection creation succeeded {}: {}", reserved, connection);
if (connection instanceof Attachable)
{
((Attachable)connection).setAttachment(reserved);
((Attachable)connection).setAttachment(new EntryHolder(reserved));
onCreated(connection);
pending.decrementAndGet();
reserved.enable(connection, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
Expand All @@ -50,6 +52,7 @@
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

Expand All @@ -73,6 +76,12 @@ public static Stream<ConnectionPoolFactory> poolsNoRoundRobin()
{
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination);
pool.setMaxDuration(10);
return pool;
}),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
);
Expand Down Expand Up @@ -432,6 +441,128 @@ protected void service(String target, org.eclipse.jetty.server.Request jettyRequ
assertThat(connectionPool.getConnectionCount(), Matchers.greaterThanOrEqualTo(count));
}

@Test
public void testMaxDurationConnectionsWithConstrainedPool() throws Exception
{
// ConnectionPools may NOT open a few more connections than expected.

final int maxDuration = 30;
AtomicInteger poolAcquireCounter = new AtomicInteger();
AtomicInteger poolRemoveCounter = new AtomicInteger();
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
// Constrain the max pool size to 1.
DuplexConnectionPool pool = new DuplexConnectionPool(destination, 1, destination)
{
@Override
protected void acquired(Connection connection)
{
poolAcquireCounter.incrementAndGet();
}

@Override
protected void removed(Connection connection)
{
poolRemoveCounter.incrementAndGet();
}
};
pool.setMaxDuration(maxDuration);
return pool;
});

startServer(new EmptyServerHandler());

QueuedThreadPool clientThreads = new QueuedThreadPool(3);
clientThreads.setName("client");
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport, null);
client.setExecutor(clientThreads);
client.start();

// Use the connection pool 5 times with a delay that is longer than the max duration in between each time.
for (int i = 0; i < 5; i++)
{
CountDownLatch latch = new CountDownLatch(1);
clientThreads.execute(() -> client.newRequest("localhost", connector.getLocalPort())
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
}));
assertTrue(latch.await(1, TimeUnit.SECONDS));

Thread.sleep(maxDuration * 2);
}

// Check that the pool acquired 5 and removed 4 connections;
// it must be exactly 4 removed b/c each cycle of the loop
// can only open 1 connection as the pool is constrained to
// maximum 1 connection.
assertThat(poolAcquireCounter.get(), Matchers.is(5));
assertThat(poolRemoveCounter.get(), Matchers.is(4));
}

@Test
public void testMaxDurationConnectionsWithUnconstrainedPool() throws Exception
{
// ConnectionPools may open a few more connections than expected.

final int maxDuration = 30;
AtomicInteger poolAcquireCounter = new AtomicInteger();
AtomicInteger poolRemoveCounter = new AtomicInteger();
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)
{
@Override
protected void acquired(Connection connection)
{
poolAcquireCounter.incrementAndGet();
}

@Override
protected void removed(Connection connection)
{
poolRemoveCounter.incrementAndGet();
}
};
pool.setMaxDuration(maxDuration);
return pool;
});

startServer(new EmptyServerHandler());

QueuedThreadPool clientThreads = new QueuedThreadPool(3);
clientThreads.setName("client");
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport, null);
client.setExecutor(clientThreads);
client.start();

// Use the connection pool 5 times with a delay that is longer than the max duration in between each time.
for (int i = 0; i < 5; i++)
{
CountDownLatch latch = new CountDownLatch(1);
clientThreads.execute(() -> client.newRequest("localhost", connector.getLocalPort())
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
}));
assertTrue(latch.await(1, TimeUnit.SECONDS));

Thread.sleep(maxDuration * 2);
}

// Check that the pool acquired 5 and removed at least 4 connections;
// it can be more than 4 removed b/c each cycle of the loop may
// open more than 1 connection as the pool is not constrained.
assertThat(poolAcquireCounter.get(), Matchers.is(5));
assertThat(poolRemoveCounter.get(), Matchers.greaterThanOrEqualTo(4));
}

@ParameterizedTest
@MethodSource("pools")
public void testConnectionMaxUsage(ConnectionPoolFactory factory) throws Exception
Expand Down
5 changes: 3 additions & 2 deletions jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,9 @@ boolean tryRelease()
}

/**
* Try to mark the entry as removed.
* @return true if the entry has to be removed from the containing pool, false otherwise.
* Try to remove the entry by marking it as closed and decrementing the multiplexing counter.
* The multiplexing counter will never go below zero and if it reaches zero, the entry is considered removed.
* @return true if the entry can be removed from the containing pool, false otherwise.
*/
boolean tryRemove()
{
Expand Down
Loading

0 comments on commit e6ca0d9

Please sign in to comment.