Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement max duration of HTTP ConnectionPools #5801

Merged
merged 1 commit into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
Expand All @@ -54,6 +57,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
private final Callback requester;
private final Pool<Connection> pool;
private boolean maximizeConnections;
private volatile long maxDurationNanos = 0L;

/**
* @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead
Expand Down Expand Up @@ -105,6 +109,27 @@ public CompletableFuture<Void> preCreateConnections(int connectionCount)
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

/**
* <p>Get the max usage duration in milliseconds of the pool's connections.
* Values {@code 0} and negative mean that there is no limit.</p>
* <p>This only guarantees that a connection cannot be acquired after the configured
* duration elapses, so that is only enforced when {@link #acquire()} is called.
* If a pool stays completely idle for a duration longer than the value
* returned by this method, the max duration will not be enforced.
* It's up to the idle timeout mechanism (see {@link HttpClient#getIdleTimeout()})
* to handle closing idle connections.</p>
*/
@ManagedAttribute(value = "The maximum duration in milliseconds a connection can be used for before it gets closed")
public long getMaxDuration()
lorban marked this conversation as resolved.
Show resolved Hide resolved
{
return TimeUnit.NANOSECONDS.toMillis(maxDurationNanos);
}

public void setMaxDuration(long maxDurationInMs)
{
this.maxDurationNanos = TimeUnit.MILLISECONDS.toNanos(maxDurationInMs);
}

protected int getMaxMultiplex()
{
return pool.getMaxMultiplex();
Expand Down Expand Up @@ -290,16 +315,35 @@ protected void proceed()

protected Connection activate()
{
Pool<Connection>.Entry entry = pool.acquire();
if (entry != null)
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("Activated {} {}", entry, pool);
Connection connection = entry.getPooled();
acquired(connection);
return connection;
Pool<Connection>.Entry entry = pool.acquire();
if (entry != null)
{
Connection connection = entry.getPooled();

long maxDurationNanos = this.maxDurationNanos;
if (maxDurationNanos > 0L)
{
EntryHolder holder = (EntryHolder)((Attachable)connection).getAttachment();
if (holder.isExpired(maxDurationNanos))
{
boolean canClose = remove(connection, true);
if (canClose)
IO.close(connection);
if (LOG.isDebugEnabled())
LOG.debug("Connection removed{} due to expiration {} {}", (canClose ? " and closed" : ""), entry, pool);
continue;
}
}

if (LOG.isDebugEnabled())
LOG.debug("Activated {} {}", entry, pool);
acquired(connection);
return connection;
}
return null;
}
return null;
}

@Override
Expand All @@ -308,11 +352,10 @@ public boolean isActive(Connection connection)
if (!(connection instanceof Attachable))
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
if (entry == null)
EntryHolder holder = (EntryHolder)attachable.getAttachment();
if (holder == null)
return false;
return !entry.isIdle();
return !holder.entry.isIdle();
}

@Override
Expand All @@ -329,13 +372,12 @@ protected boolean deactivate(Connection connection)
if (!(connection instanceof Attachable))
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
if (entry == null)
EntryHolder holder = (EntryHolder)attachable.getAttachment();
if (holder == null)
return true;
boolean reusable = pool.release(entry);
boolean reusable = pool.release(holder.entry);
if (LOG.isDebugEnabled())
LOG.debug("Released ({}) {} {}", reusable, entry, pool);
LOG.debug("Released ({}) {} {}", reusable, holder.entry, pool);
if (reusable)
return true;
remove(connection);
Expand All @@ -353,14 +395,14 @@ protected boolean remove(Connection connection, boolean force)
if (!(connection instanceof Attachable))
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
if (entry == null)
EntryHolder holder = (EntryHolder)attachable.getAttachment();
if (holder == null)
return false;
attachable.setAttachment(null);
boolean removed = pool.remove(entry);
boolean removed = pool.remove(holder.entry);
if (removed)
attachable.setAttachment(null);
if (LOG.isDebugEnabled())
LOG.debug("Removed ({}) {} {}", removed, entry, pool);
LOG.debug("Removed ({}) {} {}", removed, holder.entry, pool);
if (removed || force)
{
released(connection);
Expand Down Expand Up @@ -433,20 +475,22 @@ public void dump(Appendable out, String indent) throws IOException
@Override
public boolean sweep()
{
pool.values().stream().filter(entry -> entry.getPooled() instanceof Sweeper.Sweepable).forEach(entry ->
{
Connection connection = entry.getPooled();
if (((Sweeper.Sweepable)connection).sweep())
pool.values().stream()
.map(Pool.Entry::getPooled)
.filter(connection -> connection instanceof Sweeper.Sweepable)
.forEach(connection ->
{
boolean removed = remove(connection);
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
connection,
System.lineSeparator(),
removed ? "Removed" : "Not removed",
System.lineSeparator(),
dump());
}
});
if (((Sweeper.Sweepable)connection).sweep())
{
boolean removed = remove(connection);
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
connection,
System.lineSeparator(),
removed ? "Removed" : "Not removed",
System.lineSeparator(),
dump());
}
});
return false;
}

Expand Down Expand Up @@ -480,7 +524,7 @@ public void succeeded(Connection connection)
LOG.debug("Connection creation succeeded {}: {}", reserved, connection);
if (connection instanceof Attachable)
{
((Attachable)connection).setAttachment(reserved);
((Attachable)connection).setAttachment(new EntryHolder(reserved));
onCreated(connection);
pending.decrementAndGet();
reserved.enable(connection, false);
Expand All @@ -507,4 +551,20 @@ public void failed(Throwable x)
requester.failed(x);
}
}

private static class EntryHolder
{
private final Pool<Connection>.Entry entry;
private final long creationTimestamp = System.nanoTime();

private EntryHolder(Pool<Connection>.Entry entry)
{
this.entry = Objects.requireNonNull(entry);
}

private boolean isExpired(long timeoutNanos)
{
return System.nanoTime() - creationTimestamp >= timeoutNanos;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
Expand All @@ -50,6 +52,7 @@
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

Expand All @@ -73,6 +76,12 @@ public static Stream<ConnectionPoolFactory> poolsNoRoundRobin()
{
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination);
pool.setMaxDuration(10);
return pool;
}),
lorban marked this conversation as resolved.
Show resolved Hide resolved
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
);
Expand Down Expand Up @@ -432,6 +441,116 @@ protected void service(String target, org.eclipse.jetty.server.Request jettyRequ
assertThat(connectionPool.getConnectionCount(), Matchers.greaterThanOrEqualTo(count));
}

@Test
public void testMaxDurationConnectionsWithConstrainedPool() throws Exception
{
// ConnectionPool may NOT open more connections than expected because
// it is constrained to a single connection in this test.

final int maxConnections = 1;
final int maxDuration = 30;
AtomicInteger poolCreateCounter = new AtomicInteger();
AtomicInteger poolRemoveCounter = new AtomicInteger();
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
// Constrain the max pool size to 1.
lorban marked this conversation as resolved.
Show resolved Hide resolved
DuplexConnectionPool pool = new DuplexConnectionPool(destination, maxConnections, destination)
{
@Override
protected void onCreated(Connection connection)
{
poolCreateCounter.incrementAndGet();
}

@Override
protected void removed(Connection connection)
{
poolRemoveCounter.incrementAndGet();
}
};
pool.setMaxDuration(maxDuration);
return pool;
});

startServer(new EmptyServerHandler());

HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport, null);
client.start();

// Use the connection pool 5 times with a delay that is longer than the max duration in between each time.
for (int i = 0; i < 5; i++)
{
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send();
assertThat(response.getStatus(), Matchers.is(200));

Thread.sleep(maxDuration * 2);
}

// Check that the pool created 5 and removed 4 connections;
// it must be exactly 4 removed b/c each cycle of the loop
// can only open 1 connection as the pool is constrained to
// maximum 1 connection.
assertThat(poolCreateCounter.get(), Matchers.is(5));
assertThat(poolRemoveCounter.get(), Matchers.is(4));
}

@Test
public void testMaxDurationConnectionsWithUnconstrainedPool() throws Exception
{
// ConnectionPools may open a few more connections than expected.

final int maxDuration = 30;
AtomicInteger poolCreateCounter = new AtomicInteger();
AtomicInteger poolRemoveCounter = new AtomicInteger();
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)
{
@Override
protected void onCreated(Connection connection)
{
poolCreateCounter.incrementAndGet();
}

@Override
protected void removed(Connection connection)
{
poolRemoveCounter.incrementAndGet();
}
};
pool.setMaxDuration(maxDuration);
return pool;
});

startServer(new EmptyServerHandler());

HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport, null);
client.start();

// Use the connection pool 5 times with a delay that is longer than the max duration in between each time.
for (int i = 0; i < 5; i++)
{
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send();
assertThat(response.getStatus(), Matchers.is(200));

Thread.sleep(maxDuration * 2);
}

// Check that the pool created 5 and removed at least 4 connections;
// it can be more than 4 removed b/c each cycle of the loop may
// open more than 1 connection as the pool is not constrained.
assertThat(poolCreateCounter.get(), Matchers.is(5));
assertThat(poolRemoveCounter.get(), Matchers.greaterThanOrEqualTo(4));
}

@ParameterizedTest
@MethodSource("pools")
public void testConnectionMaxUsage(ConnectionPoolFactory factory) throws Exception
Expand Down
Loading